source: trunk/Mars/mcore/zofits.h@ 17288

Last change on this file since 17288 was 17288, checked in by lyard, 11 years ago
Bugfix to be able to reuse the same zofits object to write more than one file
File size: 38.2 KB
Line 
1#ifndef FACT_zofits
2#define FACT_zofits
3
4/*
5 * zofits.h
6 *
7 * FACT native compressed FITS writer
8 * Author: lyard
9 */
10
11#include "ofits.h"
12#include "huffman.h"
13#include "Queue.h"
14#include "MemoryManager.h"
15
16#ifdef HAVE_BOOST_THREAD
17#include <boost/thread.hpp>
18#endif
19
20class zofits : public ofits
21{
22 /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin
23 template<class S>
24 struct QueueMin : std::list<S>
25 {
26 typename std::list<S>::iterator begin()
27 {
28 return min_element(std::list<S>::begin(), std::list<S>::end());
29 }
30 };
31
32
33 //catalog types
34 struct CatalogEntry
35 {
36 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) { }
37 int64_t first; ///< Size of this column in the tile
38 int64_t second; ///< offset of this column in the tile, from the start of the heap area
39 } __attribute__((__packed__));
40
41 typedef std::vector<CatalogEntry> CatalogRow;
42 typedef std::list<CatalogRow> CatalogType;
43
44
45 /// Parameters required to write a tile to disk
46 struct WriteTarget
47 {
48 bool operator < (const WriteTarget& other) const
49 {
50 return tile_num < other.tile_num;
51 }
52
53 WriteTarget() { }
54 WriteTarget(const WriteTarget &t, uint32_t sz) : tile_num(t.tile_num), size(sz), data(t.data) { }
55
56 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
57 uint32_t size; ///< Size to write
58 std::shared_ptr<char> data; ///< Memory block to write
59 };
60
61
62 /// Parameters required to compress a tile of data
63 struct CompressionTarget
64 {
65 CompressionTarget(CatalogRow& r) : catalog_entry(r)
66 {}
67
68 CatalogRow& catalog_entry; ///< Reference to the catalog entry to deal with
69 std::shared_ptr<char> src; ///< Original data
70 std::shared_ptr<char> transposed_src; ///< Transposed data
71 WriteTarget target; ///< Compressed data
72 uint32_t num_rows; ///< Number of rows to compress
73 };
74
75public:
76 /// static setter for the default number of threads to use. -1 means all available physical cores
77 static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)<-1) n=_n; return n; }
78 static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; }
79 static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; }
80 static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; }
81
82 /// constructors
83 /// @param numTiles how many data groups should be pre-reserved ?
84 /// @param rowPerTile how many rows will be grouped together in a single tile
85 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
86 zofits(uint32_t numTiles = DefaultMaxNumTiles(),
87 uint32_t rowPerTile = DefaultNumRowsPerTile(),
88 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
89 fMemPool(0, maxUsableMem*1000),
90 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
91 {
92 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
93 SetNumThreads(DefaultNumThreads());
94 }
95
96 /// @param fname the target filename
97 /// @param numTiles how many data groups should be pre-reserved ?
98 /// @param rowPerTile how many rows will be grouped together in a single tile
99 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
100 zofits(const char* fname,
101 uint32_t numTiles = DefaultMaxNumTiles(),
102 uint32_t rowPerTile = DefaultNumRowsPerTile(),
103 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(fname),
104 fMemPool(0, maxUsableMem*1000),
105 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
106 {
107 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
108 SetNumThreads(DefaultNumThreads());
109 }
110
111 //initialization of member variables
112 /// @param nt number of tiles
113 /// @param rpt number of rows per tile
114 /// @param maxUsableMem max amount of RAM to be used by the compression buffers
115 void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
116 {
117 fCheckOffset = 0;
118 fNumQueues = 0;
119
120 fNumTiles = nt==0 ? 1 : nt;
121 fNumRowsPerTile = rpt;
122
123 fRealRowWidth = 0;
124 fCatalogOffset = 0;
125 fCatalogSize = 0;
126
127 fMaxUsableMem = maxUsableMem;
128#ifdef __EXCEPTIONS
129 fThreadsException = std::exception_ptr();
130#endif
131 fErrno = 0;
132 }
133
134 /// write the header of the binary table
135 /// @param name the name of the table to be created
136 /// @return the state of the file
137 virtual bool WriteTableHeader(const char* name="DATA")
138 {
139 reallocateBuffers();
140
141 ofits::WriteTableHeader(name);
142
143 fCompressionQueues.front().setPromptExecution(fNumQueues==0);
144 fWriteToDiskQueue.setPromptExecution(fNumQueues==0);
145
146 if (fNumQueues != 0)
147 {
148 //start the compression queues
149 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
150 it->start();
151
152 //start the disk writer
153 fWriteToDiskQueue.start();
154 }
155
156 //mark that no tile has been written so far
157 fLatestWrittenTile = -1;
158
159 //no wiring error (in the writing of the data) has occured so far
160 fErrno = 0;
161
162 return good();
163 }
164
165 /// open a new file.
166 /// @param filename the name of the file
167 /// @param Whether or not the name of the extension should be added or not
168 void open(const char* filename, bool addEXTNAMEKey=true)
169 {
170 ofits::open(filename, addEXTNAMEKey);
171
172 //add compression-related header entries
173 SetBool( "ZTABLE", true, "Table is compressed");
174 SetInt( "ZNAXIS1", 0, "Width of uncompressed rows");
175 SetInt( "ZNAXIS2", 0, "Number of uncompressed rows");
176 SetInt( "ZPCOUNT", 0, "");
177 SetInt( "ZHEAPPTR", 0, "");
178 SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
179 SetInt( "THEAP", 0, "");
180 SetStr( "RAWSUM", " 0", "Checksum of raw little endian data");
181 SetFloat("ZRATIO", 0, "Compression ratio");
182 SetInt( "ZSHRINK", 1, "Catalog shrink factor");
183
184 fCatalogSize = 0;
185 fRealRowWidth = 0;
186 fCatalogOffset = 0;
187 fCatalogSize = 0;
188 fCheckOffset = 0;
189
190 fRealColumns.clear();
191 fCatalog.clear();
192 fCatalogSum.reset();
193 fRawSum.reset();
194 }
195
196 /// Super method. does nothing as zofits does not know about DrsOffsets
197 /// @return the state of the file
198 virtual bool WriteDrsOffsetsTable()
199 {
200 return good();
201 }
202
203 /// Returns the number of bytes per uncompressed row
204 /// @return number of bytes per uncompressed row
205 uint32_t GetBytesPerRow() const
206 {
207 return fRealRowWidth;
208 }
209
210 /// Write the data catalog
211 /// @return the state of the file
212 bool WriteCatalog()
213 {
214 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
215 const uint32_t total_catalog_size = fNumTiles*one_catalog_row_size;
216
217 // swap the catalog bytes before writing
218 std::vector<char> swapped_catalog(total_catalog_size);
219
220 uint32_t shift = 0;
221 for (auto it=fCatalog.cbegin(); it!=fCatalog.cend(); it++)
222 {
223 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
224 shift += one_catalog_row_size;
225 }
226
227 if (fCatalogSize < fNumTiles)
228 memset(swapped_catalog.data()+shift, 0, total_catalog_size-shift);
229
230 // first time writing ? remember where we are
231 if (fCatalogOffset == 0)
232 fCatalogOffset = tellp();
233
234 // remember where we came from
235 const off_t where_are_we = tellp();
236
237 // write to disk
238 seekp(fCatalogOffset);
239 write(swapped_catalog.data(), total_catalog_size);
240
241 if (where_are_we != fCatalogOffset)
242 seekp(where_are_we);
243
244 // udpate checksum
245 fCatalogSum.reset();
246 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
247
248 return good();
249 }
250
251 /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets.
252 virtual void DrsOffsetCalibrate(char* )
253 {
254
255 }
256
257 CatalogRow& AddOneCatalogRow()
258 {
259 // add one row to the catalog
260 fCatalog.emplace_back(CatalogRow());
261 fCatalog.back().resize(fTable.num_cols);
262 for (auto it=fCatalog.back().begin(); it != fCatalog.back().end(); it++)
263 *it = CatalogEntry(0,0);
264
265 fCatalogSize++;
266
267 return fCatalog.back();
268 }
269
270 /// write one row of data
271 /// Note, in a multi-threaded environment (NumThreads>0), the return code should be checked rather
272 /// than the badbit() of the stream (it might have been set by a thread before the errno has been set)
273 /// errno will then contain the correct error number of the last error which happened during writing.
274 /// @param ptr the source buffer
275 /// @param the number of bytes to write
276 /// @return the state of the file. WARNING: with multithreading, this will most likely be the state of the file before the data is actually written
277 bool WriteRow(const void* ptr, size_t cnt, bool = true)
278 {
279 if (cnt != fRealRowWidth)
280 {
281#ifdef __EXCEPTIONS
282 throw std::runtime_error("Wrong size of row given to WriteRow");
283#else
284 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << std::endl;
285 return false;
286#endif
287 }
288
289#ifdef __EXCEPTIONS
290 //check if something hapenned while the compression threads were working
291 //if so, re-throw the exception that was generated
292 if (fThreadsException != std::exception_ptr())
293 std::rethrow_exception(fThreadsException);
294#endif
295
296 //copy current row to pool or rows waiting for compression
297 char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
298 memcpy(target_location, ptr, fRealRowWidth);
299
300 //for now, make an extra copy of the data, for RAWSUM checksuming.
301 //Ideally this should be moved to the threads
302 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
303 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
304 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
305 char* buffer = fRawSumBuffer.data() + rawOffset;
306 auto ib = fRawSumBuffer.begin();
307 auto ie = fRawSumBuffer.rbegin();
308 *ib++ = 0;
309 *ib++ = 0;
310 *ib++ = 0;
311 *ib = 0;
312
313 *ie++ = 0;
314 *ie++ = 0;
315 *ie++ = 0;
316 *ie = 0;
317
318 memcpy(buffer, ptr, fRealRowWidth);
319
320 fRawSum.add(fRawSumBuffer, false);
321
322 fTable.num_rows++;
323
324 if (fTable.num_rows % fNumRowsPerTile != 0)
325 {
326 errno = fErrno;
327 return errno==0;
328 }
329
330 // use the least occupied queue
331 const auto imin = std::min_element(fCompressionQueues.begin(), fCompressionQueues.end());
332
333 if (!imin->emplace(InitNextCompression()))
334 throw std::runtime_error("The compression queues are not started. Did you close the file before writing this row?");
335
336 errno = fErrno;
337 return errno==0;
338 }
339
340 /// update the real number of rows
341 void FlushNumRows()
342 {
343 SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile);
344 SetInt("ZNAXIS2", fTable.num_rows);
345 FlushHeader();
346 }
347
348 /// Setup the environment to compress yet another tile of data
349 /// @param target the struct where to host the produced parameters
350 CompressionTarget InitNextCompression()
351 {
352 CompressionTarget target(AddOneCatalogRow());
353
354 //fill up compression target
355 target.src = fSmartBuffer;
356 target.transposed_src = fMemPool.malloc();
357 target.num_rows = fTable.num_rows;
358
359 //fill up write to disk target
360 WriteTarget &write_target = target.target;
361 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
362 write_target.size = 0;
363 write_target.data = fMemPool.malloc();
364
365 //get a new buffer to host the incoming data
366 fSmartBuffer = fMemPool.malloc();
367
368 return target;
369 }
370
371 /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file.
372 const uint32_t ShrinkCatalog()
373 {
374 //add empty row to get either the target number of rows, or a multiple of the allowed size
375 for (uint32_t i=0;i<fCatalogSize%fNumTiles;i++)
376 AddOneCatalogRow();
377
378 //did we write more rows than what the catalog could host ?
379 if (fCatalogSize <= fNumTiles) // nothing to do
380 return 1;
381
382 //always exact as extra rows were added just above
383 const uint32_t shrink_factor = fCatalogSize / fNumTiles;
384
385 //shrink the catalog !
386 uint32_t entry_id = 1;
387 auto it = fCatalog.begin();
388 it++;
389 for (; it != fCatalog.end(); it++)
390 {
391 if (entry_id >= fNumTiles)
392 break;
393
394 const uint32_t target_id = entry_id*shrink_factor;
395
396 auto jt = it;
397 for (uint32_t i=0; i<target_id-entry_id; i++)
398 jt++;
399
400 *it = *jt;
401
402 entry_id++;
403 }
404
405 const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
406
407 //remove the too many entries
408 for (uint32_t i=0;i<num_tiles_to_remove;i++)
409 {
410 fCatalog.pop_back();
411 fCatalogSize--;
412 }
413
414 //update header keywords
415 fNumRowsPerTile *= shrink_factor;
416
417 SetInt("ZTILELEN", fNumRowsPerTile);
418 SetInt("ZSHRINK", shrink_factor);
419
420 return shrink_factor;
421 }
422
423 /// close an open file.
424 /// @return the state of the file
425 bool close()
426 {
427 // stop compression and write threads
428 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
429 it->wait();
430
431 fWriteToDiskQueue.wait();
432
433 if (tellp() < 0)
434 return false;
435
436#ifdef __EXCEPTIONS
437 //check if something hapenned while the compression threads were working
438 //if so, re-throw the exception that was generated
439 if (fThreadsException != std::exception_ptr())
440 std::rethrow_exception(fThreadsException);
441#endif
442
443 //write the last tile of data (if any)
444 if (fErrno==0 && fTable.num_rows%fNumRowsPerTile!=0)
445 {
446 fWriteToDiskQueue.enablePromptExecution();
447 fCompressionQueues.front().enablePromptExecution();
448 fCompressionQueues.front().emplace(InitNextCompression());
449 }
450
451 AlignTo2880Bytes();
452
453 int64_t heap_size = 0;
454 int64_t compressed_offset = 0;
455 for (auto it=fCatalog.begin(); it!= fCatalog.end(); it++)
456 {
457 compressed_offset += sizeof(FITS::TileHeader);
458 heap_size += sizeof(FITS::TileHeader);
459 for (uint32_t j=0; j<it->size(); j++)
460 {
461 heap_size += (*it)[j].first;
462 (*it)[j].second = compressed_offset;
463 compressed_offset += (*it)[j].first;
464 if ((*it)[j].first == 0)
465 (*it)[j].second = 0;
466 }
467 }
468
469 const uint32_t shrink_factor = ShrinkCatalog();
470
471 //update header keywords
472 SetInt("ZNAXIS1", fRealRowWidth);
473 SetInt("ZNAXIS2", fTable.num_rows);
474
475 SetInt("ZHEAPPTR", fCatalogSize*fTable.num_cols*sizeof(uint64_t)*2);
476
477 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
478 const uint32_t total_catalog_width = 2*sizeof(int64_t)*fTable.num_cols;
479
480 SetInt("THEAP", total_num_tiles_written*total_catalog_width);
481 SetInt("NAXIS1", total_catalog_width);
482 SetInt("NAXIS2", total_num_tiles_written);
483 SetStr("RAWSUM", std::to_string((long long int)(fRawSum.val())));
484
485 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
486 SetFloat("ZRATIO", compression_ratio);
487
488 //add to the heap size the size of the gap between the catalog and the actual heap
489 heap_size += (fCatalogSize - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
490
491 SetInt("PCOUNT", heap_size, "size of special data area");
492
493 //Just for updating the fCatalogSum value
494 WriteCatalog();
495
496 fDataSum += fCatalogSum;
497
498 const Checksum checksm = UpdateHeaderChecksum();
499
500 std::ofstream::close();
501
502 fSmartBuffer = std::shared_ptr<char>();
503
504 //restore the number of rows per tile in case the catalog has been shrinked
505 if (shrink_factor != 1)
506 fNumRowsPerTile /= shrink_factor;
507
508 if ((checksm+fDataSum).valid())
509 return true;
510
511 std::ostringstream sout;
512 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
513#ifdef __EXCEPTIONS
514 throw std::runtime_error(sout.str());
515#else
516 gLog << ___err___ << "ERROR - " << sout.str() << std::endl;
517 return false;
518#endif
519 }
520
521 /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
522 bool AddColumn(uint32_t cnt, char typechar, const std::string& name, const std::string& unit,
523 const std::string& comment="", bool addHeaderKeys=true)
524 {
525 return AddColumn(FITS::kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
526 }
527
528 /// Overload of the simplified compressed version
529 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const std::string& name,
530 const std::string& unit, const std::string& comment="", bool addHeaderKeys=true)
531 {
532 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
533 return false;
534
535 const size_t size = SizeFromType(typechar);
536
537 Table::Column col;
538 col.name = name;
539 col.type = typechar;
540 col.num = cnt;
541 col.size = size;
542 col.offset = fRealRowWidth;
543
544 fRealRowWidth += size*cnt;
545
546 fRealColumns.emplace_back(col, comp);
547
548 SetStr("ZFORM"+std::to_string((long long int)(fRealColumns.size())), std::to_string((long long int)(cnt))+typechar, "format of "+name+" "+CommentFromType(typechar));
549 SetStr("ZCTYP"+std::to_string((long long int)(fRealColumns.size())), "FACT", "Compression type: FACT");
550
551 return true;
552 }
553
554 /// Get and set the actual number of threads for this object
555 int32_t GetNumThreads() const { return fNumQueues; }
556 bool SetNumThreads(uint32_t num)
557 {
558 if (is_open())
559 {
560#ifdef __EXCEPTIONS
561 throw std::runtime_error("File must be closed before changing the number of compression threads");
562#else
563 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads" << std::endl;
564#endif
565 return false;
566 }
567
568 //get number of physically available threads
569#ifdef HAVE_BOOST_THREAD
570 unsigned int num_available_cores = boost::thread::hardware_concurrency();
571#else
572 unsigned int num_available_cores = std::thread::hardware_concurrency();
573#endif
574 // could not detect number of available cores from system properties...
575 if (num_available_cores == 0)
576 num_available_cores = 1;
577
578 // leave one core for the main thread and one for the writing
579 if (num > num_available_cores)
580 num = num_available_cores>2 ? num_available_cores-2 : 1;
581
582 fCompressionQueues.resize(num<1?1:num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false));
583 fNumQueues = num;
584
585 return true;
586 }
587
588 uint32_t GetNumTiles() const { return fNumTiles; }
589 void SetNumTiles(uint32_t num) { fNumTiles=num; }
590
591protected:
592
593 /// Allocates the required objects.
594 void reallocateBuffers()
595 {
596 const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(FITS::BlockHeader) + sizeof(FITS::TileHeader) + 8; //+8 for checksuming;
597 fMemPool.setChunkSize(chunk_size);
598
599 fSmartBuffer = fMemPool.malloc();
600 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
601 }
602
603 /// Actually does the writing to disk (and checksuming)
604 /// @param src the buffer to write
605 /// @param sizeToWrite how many bytes should be written
606 /// @return the state of the file
607 bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite)
608 {
609 char* checkSumPointer = src+4;
610 int32_t extraBytes = 0;
611 uint32_t sizeToChecksum = sizeToWrite;
612
613 //should we extend the array to the left ?
614 if (fCheckOffset != 0)
615 {
616 sizeToChecksum += fCheckOffset;
617 checkSumPointer -= fCheckOffset;
618 memset(checkSumPointer, 0, fCheckOffset);
619 }
620
621 //should we extend the array to the right ?
622 if (sizeToChecksum%4 != 0)
623 {
624 extraBytes = 4 - (sizeToChecksum%4);
625 memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
626 sizeToChecksum += extraBytes;
627 }
628
629 //do the checksum
630 fDataSum.add(checkSumPointer, sizeToChecksum);
631
632 fCheckOffset = (4 - extraBytes)%4;
633
634 //write data to disk
635 write(src+4, sizeToWrite);
636
637 return good();
638 }
639
640 /// Compress a given buffer based on the target. This is the method executed by the threads
641 /// @param target the struct hosting the parameters of the compression
642 /// @return number of bytes of the compressed data, or always 1 when used by the Queues
643 bool CompressBuffer(const CompressionTarget& target)
644 {
645 //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
646 //Really do not understand what's wrong...
647 //calibrate data if required
648 const uint32_t thisRoundNumRows = (target.num_rows%fNumRowsPerTile) ? target.num_rows%fNumRowsPerTile : fNumRowsPerTile;
649 for (uint32_t i=0;i<thisRoundNumRows;i++)
650 {
651 char* target_location = target.src.get() + fRealRowWidth*i;
652 DrsOffsetCalibrate(target_location);
653 }
654#ifdef __EXCEPTIONS
655 try
656 {
657#endif
658 //transpose the original data
659 copyTransposeTile(target.src.get(), target.transposed_src.get(), target.num_rows);
660
661 //compress the buffer
662 const uint64_t compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
663
664 //post the result to the writing queue
665 //get a copy so that it becomes non-const
666 fWriteToDiskQueue.emplace(target.target, compressed_size);
667
668#ifdef __EXCEPTIONS
669 }
670 catch (...)
671 {
672 fThreadsException = std::current_exception();
673 if (fNumQueues == 0)
674 std::rethrow_exception(fThreadsException);
675 }
676#endif
677
678 return true;
679 }
680
681 /// Write one compressed tile to disk. This is the method executed by the writing thread
682 /// @param target the struct hosting the write parameters
683 bool WriteBufferToDisk(const WriteTarget& target)
684 {
685 //is this the tile we're supposed to write ?
686 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
687 return false;
688
689 fLatestWrittenTile++;
690
691#ifdef __EXCEPTIONS
692 try
693 {
694#endif
695 //could not write the data to disk
696 if (!writeCompressedDataToDisk(target.data.get(), target.size))
697 fErrno = errno;
698#ifdef __EXCEPTIONS
699 }
700 catch (...)
701 {
702 fThreadsException = std::current_exception();
703 if (fNumQueues == 0)
704 std::rethrow_exception(fThreadsException);
705 }
706#endif
707 return true;
708 }
709
710 /// Compress a given buffer based on its source and destination
711 //src cannot be const, as applySMOOTHING is done in place
712 /// @param dest the buffer hosting the compressed data
713 /// @param src the buffer hosting the transposed data
714 /// @param num_rows the number of uncompressed rows in the transposed buffer
715 /// @param the number of bytes of the compressed data
716 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows, CatalogRow& catalog_row)
717 {
718 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
719 uint32_t offset = 0;
720
721 //skip the checksum reserved area
722 dest += 4;
723
724 //skip the 'TILE' marker and tile size entry
725 uint64_t compressedOffset = sizeof(FITS::TileHeader);
726
727 //now compress each column one by one by calling compression on arrays
728 for (uint32_t i=0;i<fRealColumns.size();i++)
729 {
730 catalog_row[i].second = compressedOffset;
731
732 if (fRealColumns[i].col.num == 0)
733 continue;
734
735 FITS::Compression& head = fRealColumns[i].block_head;
736
737 //set the default byte telling if uncompressed the compressed Flag
738 const uint64_t previousOffset = compressedOffset;
739
740 //skip header data
741 compressedOffset += head.getSizeOnDisk();
742
743 for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
744 {
745 switch (head.getProc(j))
746 {
747 case FITS::kFactRaw:
748 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
749 break;
750
751 case FITS::kFactSmoothing:
752 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
753 break;
754
755 case FITS::kFactHuffman16:
756 if (head.getOrdering() == FITS::kOrderByCol)
757 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
758 else
759 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
760 break;
761 }
762 }
763
764 //check if compressed size is larger than uncompressed
765 //if so set flag and redo it uncompressed
766 if ((head.getProc(0) != FITS::kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
767 {
768 //de-smooth !
769 if (head.getProc(0) == FITS::kFactSmoothing)
770 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
771
772 FITS::Compression he;
773
774 compressedOffset = previousOffset + he.getSizeOnDisk();
775 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
776
777 he.SetBlockSize(compressedOffset - previousOffset);
778 he.Memcpy(dest+previousOffset);
779
780 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
781
782 catalog_row[i].first = compressedOffset - catalog_row[i].second;
783 continue;
784 }
785
786 head.SetBlockSize(compressedOffset - previousOffset);
787 head.Memcpy(dest + previousOffset);
788
789 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
790 catalog_row[i].first = compressedOffset - catalog_row[i].second;
791 }
792
793 const FITS::TileHeader tile_head(thisRoundNumRows, compressedOffset);
794 memcpy(dest, &tile_head, sizeof(FITS::TileHeader));
795
796 return compressedOffset;
797 }
798
799 /// Transpose a tile to a new buffer
800 /// @param src buffer hosting the regular, row-ordered data
801 /// @param dest the target buffer that will receive the transposed data
802 void copyTransposeTile(const char* src, char* dest, uint32_t num_rows)
803 {
804 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
805
806 //copy the tile and transpose it
807 for (uint32_t i=0;i<fRealColumns.size();i++)
808 {
809 switch (fRealColumns[i].block_head.getOrdering())
810 {
811 case FITS::kOrderByRow:
812 //regular, "semi-transposed" copy
813 for (uint32_t k=0;k<thisRoundNumRows;k++)
814 {
815 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
816 dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
817 }
818 break;
819
820 case FITS::kOrderByCol:
821 //transposed copy
822 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
823 for (uint32_t k=0;k<thisRoundNumRows;k++)
824 {
825 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
826 dest += fRealColumns[i].col.size;
827 }
828 break;
829 };
830 }
831 }
832
833 /// Specific compression functions
834 /// @param dest the target buffer
835 /// @param src the source buffer
836 /// @param size number of bytes to copy
837 /// @return number of bytes written
838 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
839 {
840 memcpy(dest, src, size);
841 return size;
842 }
843
844 /// Do huffman encoding
845 /// @param dest the buffer that will receive the compressed data
846 /// @param src the buffer hosting the transposed data
847 /// @param numRows number of rows of data in the transposed buffer
848 /// @param sizeOfElems size in bytes of one data elements
849 /// @param numRowElems number of elements on each row
850 /// @return number of bytes written
851 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
852 {
853 std::string huffmanOutput;
854 uint32_t previousHuffmanSize = 0;
855
856 //if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
857 if (numRows < 2)
858 return numRows*sizeOfElems*numRowElems + 1000;
859
860 if (sizeOfElems < 2 )
861 {
862#ifdef __EXCEPTIONS
863 throw std::runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
864#else
865 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << std::endl;
866 return 0;
867#endif
868 }
869
870 uint32_t huffmanOffset = 0;
871 for (uint32_t j=0;j<numRowElems;j++)
872 {
873 Huffman::Encode(huffmanOutput,
874 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
875 numRows*(sizeOfElems/2));
876 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
877 huffmanOffset += sizeof(uint32_t);
878 previousHuffmanSize = huffmanOutput.size();
879 }
880
881 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
882
883 //only copy if not larger than not-compressed size
884 if (totalSize < numRows*sizeOfElems*numRowElems)
885 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
886
887 return totalSize;
888 }
889
890 /// Applies Thomas' DRS4 smoothing
891 /// @param data where to apply it
892 /// @param numElems how many elements of type int16_t are stored in the buffer
893 /// @return number of bytes modified
894 uint32_t applySMOOTHING(char* data, uint32_t numElems)
895 {
896 int16_t* short_data = reinterpret_cast<int16_t*>(data);
897 for (int j=numElems-1;j>1;j--)
898 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
899
900 return numElems*sizeof(int16_t);
901 }
902
903 /// Apply the inverse transform of the integer smoothing
904 /// @param data where to apply it
905 /// @param numElems how many elements of type int16_t are stored in the buffer
906 /// @return number of bytes modified
907 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
908 {
909 int16_t* short_data = reinterpret_cast<int16_t*>(data);
910 for (uint32_t j=2;j<numElems;j++)
911 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
912
913 return numElems*sizeof(uint16_t);
914 }
915
916
917
918 //thread related stuff
919 MemoryManager fMemPool; ///< Actual memory manager, providing memory for the compression buffers
920 int32_t fNumQueues; ///< Current number of threads that will be used by this object
921 uint64_t fMaxUsableMem; ///< Maximum number of bytes that can be allocated by the memory manager
922 int32_t fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads)
923
924 std::vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads)
925 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///< Writing queue (=thread)
926
927 // catalog related stuff
928 CatalogType fCatalog; ///< Catalog for this file
929 uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists)
930 uint32_t fNumTiles; ///< Number of pre-reserved tiles
931 uint32_t fNumRowsPerTile; ///< Number of rows per tile
932 off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file
933
934 // checksum related stuff
935 Checksum fCatalogSum; ///< Checksum of the catalog
936 Checksum fRawSum; ///< Raw sum (specific to FACT)
937 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
938
939 // data layout related stuff
940 /// Regular columns augmented with compression informations
941 struct CompressedColumn
942 {
943 CompressedColumn(const Table::Column& c, const FITS::Compression& h) : col(c),
944 block_head(h)
945 {}
946 Table::Column col; ///< the regular column entry
947 FITS::Compression block_head; ///< the compression data associated with that column
948 };
949 std::vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file
950 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row
951 std::shared_ptr<char> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written
952 std::vector<char> fRawSumBuffer; ///< buffer used for checksuming the incoming data, before compression
953
954#ifdef __EXCEPTIONS
955 std::exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads
956#endif
957 int fErrno; ///< propagate errno to main thread
958};
959
960#endif
Note: See TracBrowser for help on using the repository browser.