source: trunk/Mars/mcore/zofits.h@ 17258

Last change on this file since 17258 was 17258, checked in by tbretz, 11 years ago
Replaced calls to post by emplace. Therefore implemented new construtor of WriteTarget to simplify call. Removed some arts from the comments. Instead of throwing an exception if writing fails, errno is propagated to the WriteRow. Instead of throwing an exception for 0 tiles, use 1 instead, this is not critical. Check for possible exceptions from the queues in every WriteRow call (we don't want to process a whole file to just realize at the close() call that something is wrong. Added a few more const qualifiers; replaced ostringstream by to_string where appropriate for perfromance reasons.
File size: 39.1 KB
Line 
1#ifndef FACT_zofits
2#define FACT_zofits
3
4/*
5 * zofits.h
6 *
7 * FACT native compressed FITS writer
8 * Author: lyard
9 */
10
11#include "ofits.h"
12#include "zfits.h"
13#include "Queue.h"
14#include "MemoryManager.h"
15
16#ifdef USE_BOOST_THREADS
17#include <boost/thread.hpp>
18#endif
19
20#ifndef __MARS__
21namespace std
22{
23#else
24using namespace std;
25#endif
26
27class zofits : public ofits
28{
29 /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin
30 template<class S>
31 struct QueueMin : std::list<S>
32 {
33 typename std::list<S>::iterator begin()
34 {
35 return min_element(std::list<S>::begin(), std::list<S>::end());
36 }
37 };
38
39
40 //catalog types
41 struct CatalogEntry
42 {
43 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) {};
44 int64_t first; ///< Size of this column in the tile
45 int64_t second; ///< offset of this column in the tile, from the start of the heap area
46 } __attribute__((__packed__));
47
48 typedef vector<CatalogEntry> CatalogRow;
49 typedef list<CatalogRow> CatalogType;
50
51
52 /// Parameters required to write a tile to disk
53 struct WriteTarget
54 {
55 bool operator < (const WriteTarget& other)
56 {
57 return tile_num < other.tile_num;
58 }
59
60 WriteTarget() { }
61 WriteTarget(const WriteTarget &t, uint32_t sz) : tile_num(t.tile_num), size(sz), data(t.data) { }
62
63 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
64 uint32_t size; ///< Size to write
65 shared_ptr<char> data; ///< Memory block to write
66 };
67
68
69 /// Parameters required to compress a tile of data
70 struct CompressionTarget
71 {
72 CompressionTarget(CatalogRow& r) : catalog_entry(r)
73 {}
74
75 CatalogRow& catalog_entry; ///< Reference to the catalog entry to deal with
76 shared_ptr<char> src; ///< Original data
77 shared_ptr<char> transposed_src; ///< Transposed data
78 WriteTarget target; ///< Compressed data
79 uint32_t num_rows; ///< Number of rows to compress
80 };
81
82public:
83 /// static setter for the default number of threads to use. -1 means all available physical cores
84 static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)<-1) n=_n; return n; }
85 static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; }
86 static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; }
87 static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; }
88
89 /// constructors
90 /// @param numTiles how many data groups should be pre-reserved ?
91 /// @param rowPerTile how many rows will be grouped together in a single tile
92 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
93 zofits(uint32_t numTiles = DefaultMaxNumTiles(),
94 uint32_t rowPerTile = DefaultNumRowsPerTile(),
95 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
96 fMemPool(0, maxUsableMem*1000),
97 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), false)
98 {
99 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
100 SetNumThreads(DefaultNumThreads());
101 }
102
103 /// @param fname the target filename
104 /// @param numTiles how many data groups should be pre-reserved ?
105 /// @param rowPerTile how many rows will be grouped together in a single tile
106 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
107 zofits(const char* fname,
108 uint32_t numTiles = DefaultMaxNumTiles(),
109 uint32_t rowPerTile = DefaultNumRowsPerTile(),
110 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(fname),
111 fMemPool(0, maxUsableMem*1000),
112 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), false)
113 {
114 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
115 SetNumThreads(DefaultNumThreads());
116 }
117
118 //initialization of member variables
119 /// @param nt number of tiles
120 /// @param rpt number of rows per tile
121 /// @param maxUsableMem max amount of RAM to be used by the compression buffers
122 void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
123 {
124 fCheckOffset = 0;
125 fNumQueues = 0;
126
127 fNumTiles = nt==0 ? 1 : nt;
128 fNumRowsPerTile = rpt;
129
130 fRealRowWidth = 0;
131 fCatalogOffset = 0;
132 fCatalogSize = 0;
133
134 fMaxUsableMem = maxUsableMem;
135#ifdef __EXCEPTIONS
136 fThreadsException = exception_ptr();
137#endif
138 fErrno = 0;
139 }
140
141 /// write the header of the binary table
142 /// @param name the name of the table to be created
143 /// @return the state of the file
144 virtual bool WriteTableHeader(const char* name="DATA")
145 {
146 reallocateBuffers();
147
148 ofits::WriteTableHeader(name);
149
150 if (fNumQueues != 0)
151 {
152 //start the compression queues
153 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
154 it->start();
155
156 //start the disk writer
157 fWriteToDiskQueue.start();
158 }
159
160 //mark that no tile has been written so far
161 fLatestWrittenTile = -1;
162
163 return good();
164 }
165
166 /// open a new file.
167 /// @param filename the name of the file
168 /// @param Whether or not the name of the extension should be added or not
169 void open(const char* filename, bool addEXTNAMEKey=true)
170 {
171 ofits::open(filename, addEXTNAMEKey);
172
173 //add compression-related header entries
174 SetBool( "ZTABLE", true, "Table is compressed");
175 SetInt( "ZNAXIS1", 0, "Width of uncompressed rows");
176 SetInt( "ZNAXIS2", 0, "Number of uncompressed rows");
177 SetInt( "ZPCOUNT", 0, "");
178 SetInt( "ZHEAPPTR", 0, "");
179 SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
180 SetInt( "THEAP", 0, "");
181 SetStr( "RAWSUM", " 0", "Checksum of raw little endian data");
182 SetFloat("ZRATIO", 0, "Compression ratio");
183 SetInt( "ZSHRINK", 1, "Catalog shrink factor");
184
185 fCatalogSize = 0;
186 fCatalog.clear();
187 fRawSum.reset();
188 }
189
190 /// Super method. does nothing as zofits does not know about DrsOffsets
191 /// @return the state of the file
192 virtual bool WriteDrsOffsetsTable()
193 {
194 return good();
195 }
196
197 /// Returns the number of bytes per uncompressed row
198 /// @return number of bytes per uncompressed row
199 uint32_t GetBytesPerRow() const
200 {
201 return fRealRowWidth;
202 }
203
204 /// Write the data catalog
205 /// @return the state of the file
206 bool WriteCatalog()
207 {
208 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
209 const uint32_t total_catalog_size = fNumTiles*one_catalog_row_size;
210
211 // swap the catalog bytes before writing
212 vector<char> swapped_catalog(total_catalog_size);
213
214 uint32_t shift = 0;
215 for (auto it=fCatalog.cbegin(); it!=fCatalog.cend(); it++)
216 {
217 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
218 shift += one_catalog_row_size;
219 }
220
221 if (fCatalogSize < fNumTiles)
222 memset(swapped_catalog.data()+shift, 0, total_catalog_size-shift);
223
224 // first time writing ? remember where we are
225 if (fCatalogOffset == 0)
226 fCatalogOffset = tellp();
227
228 // remember where we came from
229 const off_t where_are_we = tellp();
230
231 // write to disk
232 seekp(fCatalogOffset);
233 write(swapped_catalog.data(), total_catalog_size);
234
235 if (where_are_we != fCatalogOffset)
236 seekp(where_are_we);
237
238 // udpate checksum
239 fCatalogSum.reset();
240 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
241
242 return good();
243 }
244
245 /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets.
246 virtual void DrsOffsetCalibrate(char* )
247 {
248
249 }
250
251 CatalogRow& AddOneCatalogRow()
252 {
253 // add one row to the catalog
254 fCatalog.emplace_back(CatalogRow());
255 fCatalog.back().resize(fTable.num_cols);
256 for (auto it=fCatalog.back().begin(); it != fCatalog.back().end(); it++)
257 *it = CatalogEntry(0,0);
258
259 fCatalogSize++;
260
261 return fCatalog.back();
262 }
263
264 /// write one row of data
265 /// Note, in a multi-threaded environment (NumThreads>0), the return code should be checked rather
266 /// than the badbit() of the stream (it might have been set by a thread before the errno has been set)
267 /// errno will then contain the correct error number of the last error which happened during writing.
268 /// @param ptr the source buffer
269 /// @param the number of bytes to write
270 /// @return the state of the file. WARNING: with multithreading, this will most likely be the state of the file before the data is actually written
271 bool WriteRow(const void* ptr, size_t cnt, bool = true)
272 {
273 if (cnt != fRealRowWidth)
274 {
275#ifdef __EXCEPTIONS
276 throw runtime_error("Wrong size of row given to WriteRow");
277#else
278 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << endl;
279 return false;
280#endif
281 }
282
283#ifdef __EXCEPTIONS
284 //check if something hapenned while the compression threads were working
285 //if so, re-throw the exception that was generated
286 if (fThreadsException != exception_ptr())
287 rethrow_exception(fThreadsException);
288#endif
289 //copy current row to pool or rows waiting for compression
290 char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
291 memcpy(target_location, ptr, fRealRowWidth);
292
293 //for now, make an extra copy of the data, for RAWSUM checksuming.
294 //Ideally this should be moved to the threads
295 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
296 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
297 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
298 char* buffer = fRawSumBuffer.data() + rawOffset;
299 auto ib = fRawSumBuffer.begin();
300 auto ie = fRawSumBuffer.rbegin();
301 *ib++ = 0;
302 *ib++ = 0;
303 *ib++ = 0;
304 *ib = 0;
305
306 *ie++ = 0;
307 *ie++ = 0;
308 *ie++ = 0;
309 *ie = 0;
310
311 memcpy(buffer, ptr, fRealRowWidth);
312
313 fRawSum.add(fRawSumBuffer, false);
314
315 fTable.num_rows++;
316
317 DrsOffsetCalibrate(target_location);
318
319 if (fTable.num_rows % fNumRowsPerTile != 0)
320 {
321 errno = fErrno;
322 return errno==0;
323 }
324
325 CompressionTarget compress_target(AddOneCatalogRow());
326 SetNextCompression(compress_target);
327
328 //no worker threads. do everything in-line
329 if (fNumQueues == 0)
330 {
331 const uint64_t size_to_write = CompressBuffer(compress_target);
332
333 const WriteTarget write_target(compress_target.target, size_to_write);
334 if (!WriteBufferToDisk(write_target))
335 throw runtime_error("Unexpected tile number mismatch in WriteBufferToDisk in the main thread.");
336
337 // The correct 'errno' is set, because it is the main thread.
338 return good();
339 }
340
341 //if all queues are empty, use queue 0
342 uint32_t min_index = 0;
343 uint32_t min_size = numeric_limits<uint32_t>::max();
344 uint32_t current_index = 0;
345
346 for (auto it=fCompressionQueues.cbegin(); it!=fCompressionQueues.cend(); it++)
347 {
348 if (it->size() < min_size)
349 {
350 min_index = current_index;
351 min_size = it->size();
352 }
353 current_index++;
354 }
355
356 if (!fCompressionQueues[min_index].emplace(compress_target))
357 throw runtime_error("The compression queues are not started. Did you close the file before writing this row?");
358
359 errno = fErrno;
360 return errno==0;
361 }
362
363 /// update the real number of rows
364 void FlushNumRows()
365 {
366 SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile);
367 SetInt("ZNAXIS2", fTable.num_rows);
368 FlushHeader();
369 }
370
371 /// Setup the environment to compress yet another tile of data
372 /// @param target the struct where to host the produced parameters
373 void SetNextCompression(CompressionTarget& target)
374 {
375 //fill up compression target
376 target.src = fSmartBuffer;
377 target.transposed_src = fMemPool.malloc();
378 target.num_rows = fTable.num_rows;
379
380 //fill up write to disk target
381 WriteTarget &write_target = target.target;
382 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
383 write_target.size = 0;
384 write_target.data = fMemPool.malloc();
385
386 //get a new buffer to host the incoming data
387 fSmartBuffer = fMemPool.malloc();
388 }
389
390 /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file.
391 void ShrinkCatalog()
392 {
393 //add empty row to get either the target number of rows, or a multiple of the allowed size
394 for (uint32_t i=0;i<fCatalogSize%fNumTiles;i++)
395 AddOneCatalogRow();
396
397 //did we write more rows than what the catalog could host ?
398 if (fCatalogSize > fNumTiles)
399 {
400 const uint32_t shrink_factor = fCatalogSize / fNumTiles; //always exact as extra rows were added just above
401
402 //shrink the catalog !
403 uint32_t entry_id = 1;
404 auto it = fCatalog.begin();
405 it++;
406 for (; it != fCatalog.end(); it++)
407 {
408 if (entry_id >= fNumTiles) break;
409
410 uint32_t target_id = entry_id*shrink_factor;
411
412 auto jt = it;
413 for (uint32_t i=0;i<target_id-entry_id;i++)
414 jt++;
415
416 *it = *jt;
417
418 entry_id++;
419 }
420
421 const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
422 //remove the too many entries
423 for (uint32_t i=0;i<num_tiles_to_remove;i++)
424 {
425 fCatalog.pop_back();
426 fCatalogSize--;
427 }
428 //update header keywords
429 fNumRowsPerTile *= shrink_factor;
430
431 SetInt("ZTILELEN", fNumRowsPerTile);
432 SetInt("ZSHRINK", shrink_factor);
433 }
434 }
435
436 /// close an open file.
437 /// @return the state of the file
438 bool close()
439 {
440 // stop compression and write threads
441 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
442 it->wait();
443
444 fWriteToDiskQueue.wait();
445
446 if (tellp() < 0)
447 return false;
448
449#ifdef __EXCEPTIONS
450 //check if something hapenned while the compression threads were working
451 //if so, re-throw the exception that was generated
452 if (fThreadsException != exception_ptr())
453 rethrow_exception(fThreadsException);
454#endif
455
456 //write the last tile of data (if any
457 if (fTable.num_rows%fNumRowsPerTile != 0)
458 {
459 CompressionTarget compress_target(AddOneCatalogRow());
460 SetNextCompression(compress_target);
461
462 //set number of threads to zero before calling compressBuffer
463 const int32_t backup_num_queues = fNumQueues;
464 fNumQueues = 0;
465
466 const uint64_t size_to_write = CompressBuffer(compress_target);
467 fNumQueues = backup_num_queues;
468
469 const WriteTarget write_target(compress_target.target, size_to_write);
470 if (!WriteBufferToDisk(write_target))
471 throw runtime_error("Tile number mismatch in WriteBufferToDisk writing the last tile.");
472 }
473
474 AlignTo2880Bytes();
475
476 int64_t heap_size = 0;
477 int64_t compressed_offset = 0;
478 for (auto it=fCatalog.begin(); it!= fCatalog.end(); it++)
479 {
480 compressed_offset += sizeof(TileHeader);
481 heap_size += sizeof(TileHeader);
482 for (uint32_t j=0; j<it->size(); j++)
483 {
484 heap_size += (*it)[j].first;
485 (*it)[j].second = compressed_offset;
486 compressed_offset += (*it)[j].first;
487 if ((*it)[j].first == 0)
488 (*it)[j].second = 0;
489 }
490 }
491
492 ShrinkCatalog();
493
494 //update header keywords
495 SetInt("ZNAXIS1", fRealRowWidth);
496 SetInt("ZNAXIS2", fTable.num_rows);
497
498 SetInt("ZHEAPPTR", fCatalogSize*fTable.num_cols*sizeof(uint64_t)*2);
499
500 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
501 const uint32_t total_catalog_width = 2*sizeof(int64_t)*fTable.num_cols;
502
503 SetInt("THEAP", total_num_tiles_written*total_catalog_width);
504 SetInt("NAXIS1", total_catalog_width);
505 SetInt("NAXIS2", total_num_tiles_written);
506 SetStr("RAWSUM", to_string(fRawSum.val()));
507
508 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
509 SetFloat("ZRATIO", compression_ratio);
510
511 //add to the heap size the size of the gap between the catalog and the actual heap
512 heap_size += (fCatalogSize - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
513
514 SetInt("PCOUNT", heap_size, "size of special data area");
515
516 //Just for updating the fCatalogSum value
517 WriteCatalog();
518
519 fDataSum += fCatalogSum;
520
521 const Checksum checksm = UpdateHeaderChecksum();
522
523 ofstream::close();
524
525 if ((checksm+fDataSum).valid())
526 return true;
527
528 ostringstream sout;
529 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
530#ifdef __EXCEPTIONS
531 throw runtime_error(sout.str());
532#else
533 gLog << ___err___ << "ERROR - " << sout.str() << endl;
534 return false;
535#endif
536 }
537
538 /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
539 bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true)
540 {
541 return AddColumn(kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
542 }
543
544 /// Overload of the simplified compressed version
545 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true)
546 {
547 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
548 return false;
549
550 const size_t size = SizeFromType(typechar);
551
552 Table::Column col;
553 col.name = name;
554 col.type = typechar;
555 col.num = cnt;
556 col.size = size;
557 col.offset = fRealRowWidth;
558
559 fRealRowWidth += size*cnt;
560
561 fRealColumns.emplace_back(col, comp);
562
563 ostringstream str;
564 str << "format of " << name << " [" << CommentFromType(typechar);
565
566 SetStr("ZFORM"+to_string(fRealColumns.size()), to_string(cnt)+typechar, str.str());
567 SetStr("ZCTYP"+to_string(fRealColumns.size()), "FACT", "Compression type: FACT");
568
569 return true;
570 }
571
572 /// Get and set the actual number of threads for this object
573 int32_t GetNumThreads() const { return fNumQueues;}
574 bool SetNumThreads(uint32_t num)
575 {
576 if (is_open())
577 {
578#ifdef __EXCEPTIONS
579 throw runtime_error("File must be closed before changing the number of compression threads");
580#else
581 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads" << endl;
582#endif
583 return false;
584 }
585
586 //get number of physically available threads
587#ifdef USE_BOOST_THREADS
588 unsigned int num_available_cores = boost::thread::hardware_concurrency();
589#else
590 unsigned int num_available_cores = thread::hardware_concurrency();
591#endif
592 // could not detect number of available cores from system properties...
593 if (num_available_cores == 0)
594 num_available_cores = 1;
595
596 // leave one core for the main thread and one for the writing
597 if (num > num_available_cores)
598 num = num_available_cores>2 ? num_available_cores-2 : 1;
599
600 if (fCompressionQueues.size() != uint32_t(num))
601 {
602 fCompressionQueues.resize(num, Queue<CompressionTarget>(bind(&zofits::CompressBuffer, this, placeholders::_1), false));
603 fNumQueues = num;
604 }
605
606 return true;
607 }
608
609protected:
610
611 /// Allocates the required objects.
612 void reallocateBuffers()
613 {
614 const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(BlockHeader) + sizeof(TileHeader) + 8; //+8 for checksuming;
615 fMemPool.setChunkSize(chunk_size);
616
617 fSmartBuffer = fMemPool.malloc();
618 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
619 }
620
621 /// Actually does the writing to disk (and checksuming)
622 /// @param src the buffer to write
623 /// @param sizeToWrite how many bytes should be written
624 /// @return the state of the file
625 bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite)
626 {
627 char* checkSumPointer = src+4;
628 int32_t extraBytes = 0;
629 uint32_t sizeToChecksum = sizeToWrite;
630
631 //should we extend the array to the left ?
632 if (fCheckOffset != 0)
633 {
634 sizeToChecksum += fCheckOffset;
635 checkSumPointer -= fCheckOffset;
636 memset(checkSumPointer, 0, fCheckOffset);
637 }
638
639 //should we extend the array to the right ?
640 if (sizeToChecksum%4 != 0)
641 {
642 extraBytes = 4 - (sizeToChecksum%4);
643 memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
644 sizeToChecksum += extraBytes;
645 }
646
647 //do the checksum
648 fDataSum.add(checkSumPointer, sizeToChecksum);
649
650 fCheckOffset = (4 - extraBytes)%4;
651
652 //write data to disk
653 write(src+4, sizeToWrite);
654
655 return good();
656 }
657
658 /// Compress a given buffer based on the target. This is the method executed by the threads
659 /// @param target the struct hosting the parameters of the compression
660 /// @return number of bytes of the compressed data, or always 1 when used by the Queues
661 uint32_t CompressBuffer(const CompressionTarget& target)
662 {
663 uint64_t compressed_size = 0;
664
665 //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
666 //Really do not understand what's wrong...
667 //calibrate data if required
668 //const uint32_t thisRoundNumRows = (target.num_rows%fNumRowsPerTile) ? target.num_rows%fNumRowsPerTile : fNumRowsPerTile;
669// for (uint32_t i=0;i<thisRoundNumRows;i++)
670// {
671// char* target_location = target.src.get()->get() + fRealRowWidth*i;
672// cout << "Target Location there...." << hex << static_cast<void*>(target_location) << endl;
673// DrsOffsetCalibrate(target_location);
674// }
675
676#ifdef __EXCEPTIONS
677 try
678 {
679#endif
680 //transpose the original data
681 copyTransposeTile(target.src.get(), target.transposed_src.get());
682
683 //compress the buffer
684 compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
685#ifdef __EXCEPTIONS
686 }
687 catch (...)
688 {
689 fThreadsException = current_exception();
690 if (fNumQueues == 0)
691 rethrow_exception(fThreadsException);
692 }
693#endif
694
695 if (fNumQueues == 0)
696 return compressed_size;
697
698 //post the result to the writing queue
699 //get a copy so that it becomes non-const
700 fWriteToDiskQueue.emplace(target.target, compressed_size);
701
702 // if used by the queue, always return true as the elements are not ordered
703 return 1;
704 }
705
706 /// Write one compressed tile to disk. This is the method executed by the writing thread
707 /// @param target the struct hosting the write parameters
708 bool WriteBufferToDisk(const WriteTarget& target)
709 {
710 //is this the tile we're supposed to write ?
711 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
712 return false;
713
714 fLatestWrittenTile++;
715
716#ifdef __EXCEPTIONS
717 try
718 {
719#endif
720 //could not write the data to disk
721 if (!writeCompressedDataToDisk(target.data.get(), target.size))
722 fErrno = errno;
723#ifdef __EXCEPTIONS
724 }
725 catch (...)
726 {
727 fThreadsException = current_exception();
728 if (fNumQueues == 0)
729 rethrow_exception(fThreadsException);
730 }
731#endif
732 return true;
733 }
734
735 /// Compress a given buffer based on its source and destination
736 //src cannot be const, as applySMOOTHING is done in place
737 /// @param dest the buffer hosting the compressed data
738 /// @param src the buffer hosting the transposed data
739 /// @param num_rows the number of uncompressed rows in the transposed buffer
740 /// @param the number of bytes of the compressed data
741 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows, CatalogRow& catalog_row)
742 {
743 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
744 uint32_t offset = 0;
745
746 //skip the checksum reserved area
747 dest += 4;
748
749 //skip the 'TILE' marker and tile size entry
750 uint64_t compressedOffset = sizeof(TileHeader);
751
752 //now compress each column one by one by calling compression on arrays
753 for (uint32_t i=0;i<fRealColumns.size();i++)
754 {
755 catalog_row[i].second = compressedOffset;
756
757 if (fRealColumns[i].col.num == 0)
758 continue;
759
760 Compression& head = fRealColumns[i].block_head;
761
762 //set the default byte telling if uncompressed the compressed Flag
763 const uint64_t previousOffset = compressedOffset;
764
765 //skip header data
766 compressedOffset += head.getSizeOnDisk();
767
768 for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
769 {
770 switch (head.getProc(j))
771 {
772 case kFactRaw:
773 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
774 break;
775
776 case kFactSmoothing:
777 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
778 break;
779
780 case kFactHuffman16:
781 if (head.getOrdering() == kOrderByCol)
782 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
783 else
784 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
785 break;
786 }
787 }
788
789 //check if compressed size is larger than uncompressed
790 if ((head.getProc(0) != kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
791 {//if so set flag and redo it uncompressed
792 //de-smooth !
793 if (head.getProc(0) == kFactSmoothing)
794 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
795
796 Compression he;
797
798 compressedOffset = previousOffset + he.getSizeOnDisk();
799 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
800
801 he.SetBlockSize(compressedOffset - previousOffset);
802 he.Memcpy(dest+previousOffset);
803
804 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
805
806 catalog_row[i].first = compressedOffset - catalog_row[i].second;
807 continue;
808 }
809
810 head.SetBlockSize(compressedOffset - previousOffset);
811 head.Memcpy(dest + previousOffset);
812
813 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
814 catalog_row[i].first = compressedOffset - catalog_row[i].second;
815 }
816
817 const TileHeader tile_head(thisRoundNumRows, compressedOffset);
818 memcpy(dest, &tile_head, sizeof(TileHeader));
819
820 return compressedOffset;
821 }
822
823 /// Transpose a tile to a new buffer
824 /// @param src buffer hosting the regular, row-ordered data
825 /// @param dest the target buffer that will receive the transposed data
826 void copyTransposeTile(const char* src, char* dest)
827 {
828 const uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile;
829
830 //copy the tile and transpose it
831 for (uint32_t i=0;i<fRealColumns.size();i++)
832 {
833 switch (fRealColumns[i].block_head.getOrdering())
834 {
835 case kOrderByRow:
836 //regular, "semi-transposed" copy
837 for (uint32_t k=0;k<thisRoundNumRows;k++)
838 {
839 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
840 dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
841 }
842 break;
843
844 case kOrderByCol:
845 //transposed copy
846 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
847 for (uint32_t k=0;k<thisRoundNumRows;k++)
848 {
849 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
850 dest += fRealColumns[i].col.size;
851 }
852 break;
853 };
854 }
855 }
856
857 /// Specific compression functions
858 /// @param dest the target buffer
859 /// @param src the source buffer
860 /// @param size number of bytes to copy
861 /// @return number of bytes written
862 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
863 {
864 memcpy(dest, src, size);
865 return size;
866 }
867
868 /// Do huffman encoding
869 /// @param dest the buffer that will receive the compressed data
870 /// @param src the buffer hosting the transposed data
871 /// @param numRows number of rows of data in the transposed buffer
872 /// @param sizeOfElems size in bytes of one data elements
873 /// @param numRowElems number of elements on each row
874 /// @return number of bytes written
875 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
876 {
877 string huffmanOutput;
878 uint32_t previousHuffmanSize = 0;
879
880 //if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
881 if (numRows < 2)
882 return numRows*sizeOfElems*numRowElems + 1000;
883
884 if (sizeOfElems < 2 )
885 {
886#ifdef __EXCEPTIONS
887 throw runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
888#else
889 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << endl;
890 return 0;
891#endif
892 }
893
894 uint32_t huffmanOffset = 0;
895 for (uint32_t j=0;j<numRowElems;j++)
896 {
897 Huffman::Encode(huffmanOutput,
898 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
899 numRows*(sizeOfElems/2));
900 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
901 huffmanOffset += sizeof(uint32_t);
902 previousHuffmanSize = huffmanOutput.size();
903 }
904
905 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
906
907 //only copy if not larger than not-compressed size
908 if (totalSize < numRows*sizeOfElems*numRowElems)
909 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
910
911 return totalSize;
912 }
913
914 /// Applies Thomas' DRS4 smoothing
915 /// @param data where to apply it
916 /// @param numElems how many elements of type int16_t are stored in the buffer
917 /// @return number of bytes modified
918 uint32_t applySMOOTHING(char* data, uint32_t numElems)
919 {
920 int16_t* short_data = reinterpret_cast<int16_t*>(data);
921 for (int j=numElems-1;j>1;j--)
922 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
923
924 return numElems*sizeof(int16_t);
925 }
926
927 /// Apply the inverse transform of the integer smoothing
928 /// @param data where to apply it
929 /// @param numElems how many elements of type int16_t are stored in the buffer
930 /// @return number of bytes modified
931 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
932 {
933 int16_t* short_data = reinterpret_cast<int16_t*>(data);
934 for (uint32_t j=2;j<numElems;j++)
935 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
936
937 return numElems*sizeof(uint16_t);
938 }
939
940
941
942 //thread related stuff
943 MemoryManager fMemPool; ///< Actual memory manager, providing memory for the compression buffers
944 int32_t fNumQueues; ///< Current number of threads that will be used by this object
945 uint64_t fMaxUsableMem; ///< Maximum number of bytes that can be allocated by the memory manager
946 int32_t fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads)
947
948 vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads)
949 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///< Writing queue (=thread)
950
951 // catalog related stuff
952 CatalogType fCatalog; ///< Catalog for this file
953 uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists)
954 uint32_t fNumTiles; ///< Number of pre-reserved tiles
955 uint32_t fNumRowsPerTile; ///< Number of rows per tile
956 off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file
957
958 // checksum related stuff
959 Checksum fCatalogSum; ///< Checksum of the catalog
960 Checksum fRawSum; ///< Raw sum (specific to FACT)
961 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
962
963 // data layout related stuff
964 /// Regular columns augmented with compression informations
965 struct CompressedColumn
966 {
967 CompressedColumn(const Table::Column& c, const Compression& h) : col(c),
968 block_head(h)
969 {}
970 Table::Column col; ///< the regular column entry
971 Compression block_head; ///< the compression data associated with that column
972 };
973 vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file
974 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row
975 shared_ptr<char> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written
976 vector<char> fRawSumBuffer; ///< buffer used for checksuming the incoming data, before compression
977
978#ifdef __EXCEPTIONS
979 exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads
980#endif
981 int fErrno; ///< propagate errno to main thread
982
983
984};
985
986#ifndef __MARS__
987}; //namespace std
988#endif
989
990#endif
Note: See TracBrowser for help on using the repository browser.