source: trunk/Mars/mcore/zofits.h@ 17289

Last change on this file since 17289 was 17289, checked in by tbretz, 11 years ago
Added missing Mars style exception handling.
File size: 38.4 KB
Line 
1#ifndef FACT_zofits
2#define FACT_zofits
3
4/*
5 * zofits.h
6 *
7 * FACT native compressed FITS writer
8 * Author: lyard
9 */
10
11#include "ofits.h"
12#include "huffman.h"
13#include "Queue.h"
14#include "MemoryManager.h"
15
16#ifdef HAVE_BOOST_THREAD
17#include <boost/thread.hpp>
18#endif
19
20class zofits : public ofits
21{
22 /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin
23 template<class S>
24 struct QueueMin : std::list<S>
25 {
26 typename std::list<S>::iterator begin()
27 {
28 return min_element(std::list<S>::begin(), std::list<S>::end());
29 }
30 };
31
32
33 //catalog types
34 struct CatalogEntry
35 {
36 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) { }
37 int64_t first; ///< Size of this column in the tile
38 int64_t second; ///< offset of this column in the tile, from the start of the heap area
39 } __attribute__((__packed__));
40
41 typedef std::vector<CatalogEntry> CatalogRow;
42 typedef std::list<CatalogRow> CatalogType;
43
44
45 /// Parameters required to write a tile to disk
46 struct WriteTarget
47 {
48 bool operator < (const WriteTarget& other) const
49 {
50 return tile_num < other.tile_num;
51 }
52
53 WriteTarget() { }
54 WriteTarget(const WriteTarget &t, uint32_t sz) : tile_num(t.tile_num), size(sz), data(t.data) { }
55
56 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
57 uint32_t size; ///< Size to write
58 std::shared_ptr<char> data; ///< Memory block to write
59 };
60
61
62 /// Parameters required to compress a tile of data
63 struct CompressionTarget
64 {
65 CompressionTarget(CatalogRow& r) : catalog_entry(r)
66 {}
67
68 CatalogRow& catalog_entry; ///< Reference to the catalog entry to deal with
69 std::shared_ptr<char> src; ///< Original data
70 std::shared_ptr<char> transposed_src; ///< Transposed data
71 WriteTarget target; ///< Compressed data
72 uint32_t num_rows; ///< Number of rows to compress
73 };
74
75public:
76 /// static setter for the default number of threads to use. -1 means all available physical cores
77 static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)<-1) n=_n; return n; }
78 static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; }
79 static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; }
80 static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; }
81
82 /// constructors
83 /// @param numTiles how many data groups should be pre-reserved ?
84 /// @param rowPerTile how many rows will be grouped together in a single tile
85 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
86 zofits(uint32_t numTiles = DefaultMaxNumTiles(),
87 uint32_t rowPerTile = DefaultNumRowsPerTile(),
88 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
89 fMemPool(0, maxUsableMem*1000),
90 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
91 {
92 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
93 SetNumThreads(DefaultNumThreads());
94 }
95
96 /// @param fname the target filename
97 /// @param numTiles how many data groups should be pre-reserved ?
98 /// @param rowPerTile how many rows will be grouped together in a single tile
99 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
100 zofits(const char* fname,
101 uint32_t numTiles = DefaultMaxNumTiles(),
102 uint32_t rowPerTile = DefaultNumRowsPerTile(),
103 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(fname),
104 fMemPool(0, maxUsableMem*1000),
105 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
106 {
107 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
108 SetNumThreads(DefaultNumThreads());
109 }
110
111 //initialization of member variables
112 /// @param nt number of tiles
113 /// @param rpt number of rows per tile
114 /// @param maxUsableMem max amount of RAM to be used by the compression buffers
115 void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
116 {
117 fCheckOffset = 0;
118 fNumQueues = 0;
119
120 fNumTiles = nt==0 ? 1 : nt;
121 fNumRowsPerTile = rpt;
122
123 fRealRowWidth = 0;
124 fCatalogOffset = 0;
125 fCatalogSize = 0;
126
127 fMaxUsableMem = maxUsableMem;
128#ifdef __EXCEPTIONS
129 fThreadsException = std::exception_ptr();
130#endif
131 fErrno = 0;
132 }
133
134 /// write the header of the binary table
135 /// @param name the name of the table to be created
136 /// @return the state of the file
137 virtual bool WriteTableHeader(const char* name="DATA")
138 {
139 reallocateBuffers();
140
141 ofits::WriteTableHeader(name);
142
143 fCompressionQueues.front().setPromptExecution(fNumQueues==0);
144 fWriteToDiskQueue.setPromptExecution(fNumQueues==0);
145
146 if (fNumQueues != 0)
147 {
148 //start the compression queues
149 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
150 it->start();
151
152 //start the disk writer
153 fWriteToDiskQueue.start();
154 }
155
156 //mark that no tile has been written so far
157 fLatestWrittenTile = -1;
158
159 //no wiring error (in the writing of the data) has occured so far
160 fErrno = 0;
161
162 return good();
163 }
164
165 /// open a new file.
166 /// @param filename the name of the file
167 /// @param Whether or not the name of the extension should be added or not
168 void open(const char* filename, bool addEXTNAMEKey=true)
169 {
170 ofits::open(filename, addEXTNAMEKey);
171
172 //add compression-related header entries
173 SetBool( "ZTABLE", true, "Table is compressed");
174 SetInt( "ZNAXIS1", 0, "Width of uncompressed rows");
175 SetInt( "ZNAXIS2", 0, "Number of uncompressed rows");
176 SetInt( "ZPCOUNT", 0, "");
177 SetInt( "ZHEAPPTR", 0, "");
178 SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
179 SetInt( "THEAP", 0, "");
180 SetStr( "RAWSUM", " 0", "Checksum of raw little endian data");
181 SetFloat("ZRATIO", 0, "Compression ratio");
182 SetInt( "ZSHRINK", 1, "Catalog shrink factor");
183
184 fCatalogSize = 0;
185 fRealRowWidth = 0;
186 fCatalogOffset = 0;
187 fCatalogSize = 0;
188 fCheckOffset = 0;
189
190 fRealColumns.clear();
191 fCatalog.clear();
192 fCatalogSum.reset();
193 fRawSum.reset();
194 }
195
196 /// Super method. does nothing as zofits does not know about DrsOffsets
197 /// @return the state of the file
198 virtual bool WriteDrsOffsetsTable()
199 {
200 return good();
201 }
202
203 /// Returns the number of bytes per uncompressed row
204 /// @return number of bytes per uncompressed row
205 uint32_t GetBytesPerRow() const
206 {
207 return fRealRowWidth;
208 }
209
210 /// Write the data catalog
211 /// @return the state of the file
212 bool WriteCatalog()
213 {
214 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
215 const uint32_t total_catalog_size = fNumTiles*one_catalog_row_size;
216
217 // swap the catalog bytes before writing
218 std::vector<char> swapped_catalog(total_catalog_size);
219
220 uint32_t shift = 0;
221 for (auto it=fCatalog.cbegin(); it!=fCatalog.cend(); it++)
222 {
223 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
224 shift += one_catalog_row_size;
225 }
226
227 if (fCatalogSize < fNumTiles)
228 memset(swapped_catalog.data()+shift, 0, total_catalog_size-shift);
229
230 // first time writing ? remember where we are
231 if (fCatalogOffset == 0)
232 fCatalogOffset = tellp();
233
234 // remember where we came from
235 const off_t where_are_we = tellp();
236
237 // write to disk
238 seekp(fCatalogOffset);
239 write(swapped_catalog.data(), total_catalog_size);
240
241 if (where_are_we != fCatalogOffset)
242 seekp(where_are_we);
243
244 // udpate checksum
245 fCatalogSum.reset();
246 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
247
248 return good();
249 }
250
251 /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets.
252 virtual void DrsOffsetCalibrate(char* )
253 {
254
255 }
256
257 CatalogRow& AddOneCatalogRow()
258 {
259 // add one row to the catalog
260 fCatalog.emplace_back(CatalogRow());
261 fCatalog.back().resize(fTable.num_cols);
262 for (auto it=fCatalog.back().begin(); it != fCatalog.back().end(); it++)
263 *it = CatalogEntry(0,0);
264
265 fCatalogSize++;
266
267 return fCatalog.back();
268 }
269
270 /// write one row of data
271 /// Note, in a multi-threaded environment (NumThreads>0), the return code should be checked rather
272 /// than the badbit() of the stream (it might have been set by a thread before the errno has been set)
273 /// errno will then contain the correct error number of the last error which happened during writing.
274 /// @param ptr the source buffer
275 /// @param the number of bytes to write
276 /// @return the state of the file. WARNING: with multithreading, this will most likely be the state of the file before the data is actually written
277 bool WriteRow(const void* ptr, size_t cnt, bool = true)
278 {
279 if (cnt != fRealRowWidth)
280 {
281#ifdef __EXCEPTIONS
282 throw std::runtime_error("Wrong size of row given to WriteRow");
283#else
284 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << std::endl;
285 return false;
286#endif
287 }
288
289#ifdef __EXCEPTIONS
290 //check if something hapenned while the compression threads were working
291 //if so, re-throw the exception that was generated
292 if (fThreadsException != std::exception_ptr())
293 std::rethrow_exception(fThreadsException);
294#endif
295
296 //copy current row to pool or rows waiting for compression
297 char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
298 memcpy(target_location, ptr, fRealRowWidth);
299
300 //for now, make an extra copy of the data, for RAWSUM checksuming.
301 //Ideally this should be moved to the threads
302 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
303 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
304 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
305 char* buffer = fRawSumBuffer.data() + rawOffset;
306 auto ib = fRawSumBuffer.begin();
307 auto ie = fRawSumBuffer.rbegin();
308 *ib++ = 0;
309 *ib++ = 0;
310 *ib++ = 0;
311 *ib = 0;
312
313 *ie++ = 0;
314 *ie++ = 0;
315 *ie++ = 0;
316 *ie = 0;
317
318 memcpy(buffer, ptr, fRealRowWidth);
319
320 fRawSum.add(fRawSumBuffer, false);
321
322 fTable.num_rows++;
323
324 if (fTable.num_rows % fNumRowsPerTile != 0)
325 {
326 errno = fErrno;
327 return errno==0;
328 }
329
330 // use the least occupied queue
331 const auto imin = std::min_element(fCompressionQueues.begin(), fCompressionQueues.end());
332
333 if (!imin->emplace(InitNextCompression()))
334 {
335#ifdef __EXCEPTIONS
336 throw std::runtime_error("The compression queues are not started. Did you close the file before writing this row?");
337#else
338 gLog << ___err___ << "The compression queues are not started. Did you close the file before writing this row?" << std::endl;
339 errno = 0;
340 return false;
341#endif
342 }
343
344 errno = fErrno;
345 return errno==0;
346 }
347
348 /// update the real number of rows
349 void FlushNumRows()
350 {
351 SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile);
352 SetInt("ZNAXIS2", fTable.num_rows);
353 FlushHeader();
354 }
355
356 /// Setup the environment to compress yet another tile of data
357 /// @param target the struct where to host the produced parameters
358 CompressionTarget InitNextCompression()
359 {
360 CompressionTarget target(AddOneCatalogRow());
361
362 //fill up compression target
363 target.src = fSmartBuffer;
364 target.transposed_src = fMemPool.malloc();
365 target.num_rows = fTable.num_rows;
366
367 //fill up write to disk target
368 WriteTarget &write_target = target.target;
369 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
370 write_target.size = 0;
371 write_target.data = fMemPool.malloc();
372
373 //get a new buffer to host the incoming data
374 fSmartBuffer = fMemPool.malloc();
375
376 return target;
377 }
378
379 /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file.
380 const uint32_t ShrinkCatalog()
381 {
382 //add empty row to get either the target number of rows, or a multiple of the allowed size
383 for (uint32_t i=0;i<fCatalogSize%fNumTiles;i++)
384 AddOneCatalogRow();
385
386 //did we write more rows than what the catalog could host ?
387 if (fCatalogSize <= fNumTiles) // nothing to do
388 return 1;
389
390 //always exact as extra rows were added just above
391 const uint32_t shrink_factor = fCatalogSize / fNumTiles;
392
393 //shrink the catalog !
394 uint32_t entry_id = 1;
395 auto it = fCatalog.begin();
396 it++;
397 for (; it != fCatalog.end(); it++)
398 {
399 if (entry_id >= fNumTiles)
400 break;
401
402 const uint32_t target_id = entry_id*shrink_factor;
403
404 auto jt = it;
405 for (uint32_t i=0; i<target_id-entry_id; i++)
406 jt++;
407
408 *it = *jt;
409
410 entry_id++;
411 }
412
413 const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
414
415 //remove the too many entries
416 for (uint32_t i=0;i<num_tiles_to_remove;i++)
417 {
418 fCatalog.pop_back();
419 fCatalogSize--;
420 }
421
422 //update header keywords
423 fNumRowsPerTile *= shrink_factor;
424
425 SetInt("ZTILELEN", fNumRowsPerTile);
426 SetInt("ZSHRINK", shrink_factor);
427
428 return shrink_factor;
429 }
430
431 /// close an open file.
432 /// @return the state of the file
433 bool close()
434 {
435 // stop compression and write threads
436 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
437 it->wait();
438
439 fWriteToDiskQueue.wait();
440
441 if (tellp() < 0)
442 return false;
443
444#ifdef __EXCEPTIONS
445 //check if something hapenned while the compression threads were working
446 //if so, re-throw the exception that was generated
447 if (fThreadsException != std::exception_ptr())
448 std::rethrow_exception(fThreadsException);
449#endif
450
451 //write the last tile of data (if any)
452 if (fErrno==0 && fTable.num_rows%fNumRowsPerTile!=0)
453 {
454 fWriteToDiskQueue.enablePromptExecution();
455 fCompressionQueues.front().enablePromptExecution();
456 fCompressionQueues.front().emplace(InitNextCompression());
457 }
458
459 AlignTo2880Bytes();
460
461 int64_t heap_size = 0;
462 int64_t compressed_offset = 0;
463 for (auto it=fCatalog.begin(); it!= fCatalog.end(); it++)
464 {
465 compressed_offset += sizeof(FITS::TileHeader);
466 heap_size += sizeof(FITS::TileHeader);
467 for (uint32_t j=0; j<it->size(); j++)
468 {
469 heap_size += (*it)[j].first;
470 (*it)[j].second = compressed_offset;
471 compressed_offset += (*it)[j].first;
472 if ((*it)[j].first == 0)
473 (*it)[j].second = 0;
474 }
475 }
476
477 const uint32_t shrink_factor = ShrinkCatalog();
478
479 //update header keywords
480 SetInt("ZNAXIS1", fRealRowWidth);
481 SetInt("ZNAXIS2", fTable.num_rows);
482
483 SetInt("ZHEAPPTR", fCatalogSize*fTable.num_cols*sizeof(uint64_t)*2);
484
485 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
486 const uint32_t total_catalog_width = 2*sizeof(int64_t)*fTable.num_cols;
487
488 SetInt("THEAP", total_num_tiles_written*total_catalog_width);
489 SetInt("NAXIS1", total_catalog_width);
490 SetInt("NAXIS2", total_num_tiles_written);
491 SetStr("RAWSUM", std::to_string((long long int)(fRawSum.val())));
492
493 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
494 SetFloat("ZRATIO", compression_ratio);
495
496 //add to the heap size the size of the gap between the catalog and the actual heap
497 heap_size += (fCatalogSize - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
498
499 SetInt("PCOUNT", heap_size, "size of special data area");
500
501 //Just for updating the fCatalogSum value
502 WriteCatalog();
503
504 fDataSum += fCatalogSum;
505
506 const Checksum checksm = UpdateHeaderChecksum();
507
508 std::ofstream::close();
509
510 fSmartBuffer = std::shared_ptr<char>();
511
512 //restore the number of rows per tile in case the catalog has been shrinked
513 if (shrink_factor != 1)
514 fNumRowsPerTile /= shrink_factor;
515
516 if ((checksm+fDataSum).valid())
517 return true;
518
519 std::ostringstream sout;
520 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
521#ifdef __EXCEPTIONS
522 throw std::runtime_error(sout.str());
523#else
524 gLog << ___err___ << "ERROR - " << sout.str() << std::endl;
525 return false;
526#endif
527 }
528
529 /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
530 bool AddColumn(uint32_t cnt, char typechar, const std::string& name, const std::string& unit,
531 const std::string& comment="", bool addHeaderKeys=true)
532 {
533 return AddColumn(FITS::kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
534 }
535
536 /// Overload of the simplified compressed version
537 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const std::string& name,
538 const std::string& unit, const std::string& comment="", bool addHeaderKeys=true)
539 {
540 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
541 return false;
542
543 const size_t size = SizeFromType(typechar);
544
545 Table::Column col;
546 col.name = name;
547 col.type = typechar;
548 col.num = cnt;
549 col.size = size;
550 col.offset = fRealRowWidth;
551
552 fRealRowWidth += size*cnt;
553
554 fRealColumns.emplace_back(col, comp);
555
556 SetStr("ZFORM"+std::to_string((long long int)(fRealColumns.size())), std::to_string((long long int)(cnt))+typechar, "format of "+name+" "+CommentFromType(typechar));
557 SetStr("ZCTYP"+std::to_string((long long int)(fRealColumns.size())), "FACT", "Compression type: FACT");
558
559 return true;
560 }
561
562 /// Get and set the actual number of threads for this object
563 int32_t GetNumThreads() const { return fNumQueues; }
564 bool SetNumThreads(uint32_t num)
565 {
566 if (is_open())
567 {
568#ifdef __EXCEPTIONS
569 throw std::runtime_error("File must be closed before changing the number of compression threads");
570#else
571 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads" << std::endl;
572#endif
573 return false;
574 }
575
576 //get number of physically available threads
577#ifdef HAVE_BOOST_THREAD
578 unsigned int num_available_cores = boost::thread::hardware_concurrency();
579#else
580 unsigned int num_available_cores = std::thread::hardware_concurrency();
581#endif
582 // could not detect number of available cores from system properties...
583 if (num_available_cores == 0)
584 num_available_cores = 1;
585
586 // leave one core for the main thread and one for the writing
587 if (num > num_available_cores)
588 num = num_available_cores>2 ? num_available_cores-2 : 1;
589
590 fCompressionQueues.resize(num<1?1:num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false));
591 fNumQueues = num;
592
593 return true;
594 }
595
596 uint32_t GetNumTiles() const { return fNumTiles; }
597 void SetNumTiles(uint32_t num) { fNumTiles=num; }
598
599protected:
600
601 /// Allocates the required objects.
602 void reallocateBuffers()
603 {
604 const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(FITS::BlockHeader) + sizeof(FITS::TileHeader) + 8; //+8 for checksuming;
605 fMemPool.setChunkSize(chunk_size);
606
607 fSmartBuffer = fMemPool.malloc();
608 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
609 }
610
611 /// Actually does the writing to disk (and checksuming)
612 /// @param src the buffer to write
613 /// @param sizeToWrite how many bytes should be written
614 /// @return the state of the file
615 bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite)
616 {
617 char* checkSumPointer = src+4;
618 int32_t extraBytes = 0;
619 uint32_t sizeToChecksum = sizeToWrite;
620
621 //should we extend the array to the left ?
622 if (fCheckOffset != 0)
623 {
624 sizeToChecksum += fCheckOffset;
625 checkSumPointer -= fCheckOffset;
626 memset(checkSumPointer, 0, fCheckOffset);
627 }
628
629 //should we extend the array to the right ?
630 if (sizeToChecksum%4 != 0)
631 {
632 extraBytes = 4 - (sizeToChecksum%4);
633 memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
634 sizeToChecksum += extraBytes;
635 }
636
637 //do the checksum
638 fDataSum.add(checkSumPointer, sizeToChecksum);
639
640 fCheckOffset = (4 - extraBytes)%4;
641
642 //write data to disk
643 write(src+4, sizeToWrite);
644
645 return good();
646 }
647
648 /// Compress a given buffer based on the target. This is the method executed by the threads
649 /// @param target the struct hosting the parameters of the compression
650 /// @return number of bytes of the compressed data, or always 1 when used by the Queues
651 bool CompressBuffer(const CompressionTarget& target)
652 {
653 //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
654 //Really do not understand what's wrong...
655 //calibrate data if required
656 const uint32_t thisRoundNumRows = (target.num_rows%fNumRowsPerTile) ? target.num_rows%fNumRowsPerTile : fNumRowsPerTile;
657 for (uint32_t i=0;i<thisRoundNumRows;i++)
658 {
659 char* target_location = target.src.get() + fRealRowWidth*i;
660 DrsOffsetCalibrate(target_location);
661 }
662#ifdef __EXCEPTIONS
663 try
664 {
665#endif
666 //transpose the original data
667 copyTransposeTile(target.src.get(), target.transposed_src.get(), target.num_rows);
668
669 //compress the buffer
670 const uint64_t compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
671
672 //post the result to the writing queue
673 //get a copy so that it becomes non-const
674 fWriteToDiskQueue.emplace(target.target, compressed_size);
675
676#ifdef __EXCEPTIONS
677 }
678 catch (...)
679 {
680 fThreadsException = std::current_exception();
681 if (fNumQueues == 0)
682 std::rethrow_exception(fThreadsException);
683 }
684#endif
685
686 return true;
687 }
688
689 /// Write one compressed tile to disk. This is the method executed by the writing thread
690 /// @param target the struct hosting the write parameters
691 bool WriteBufferToDisk(const WriteTarget& target)
692 {
693 //is this the tile we're supposed to write ?
694 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
695 return false;
696
697 fLatestWrittenTile++;
698
699#ifdef __EXCEPTIONS
700 try
701 {
702#endif
703 //could not write the data to disk
704 if (!writeCompressedDataToDisk(target.data.get(), target.size))
705 fErrno = errno;
706#ifdef __EXCEPTIONS
707 }
708 catch (...)
709 {
710 fThreadsException = std::current_exception();
711 if (fNumQueues == 0)
712 std::rethrow_exception(fThreadsException);
713 }
714#endif
715 return true;
716 }
717
718 /// Compress a given buffer based on its source and destination
719 //src cannot be const, as applySMOOTHING is done in place
720 /// @param dest the buffer hosting the compressed data
721 /// @param src the buffer hosting the transposed data
722 /// @param num_rows the number of uncompressed rows in the transposed buffer
723 /// @param the number of bytes of the compressed data
724 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows, CatalogRow& catalog_row)
725 {
726 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
727 uint32_t offset = 0;
728
729 //skip the checksum reserved area
730 dest += 4;
731
732 //skip the 'TILE' marker and tile size entry
733 uint64_t compressedOffset = sizeof(FITS::TileHeader);
734
735 //now compress each column one by one by calling compression on arrays
736 for (uint32_t i=0;i<fRealColumns.size();i++)
737 {
738 catalog_row[i].second = compressedOffset;
739
740 if (fRealColumns[i].col.num == 0)
741 continue;
742
743 FITS::Compression& head = fRealColumns[i].block_head;
744
745 //set the default byte telling if uncompressed the compressed Flag
746 const uint64_t previousOffset = compressedOffset;
747
748 //skip header data
749 compressedOffset += head.getSizeOnDisk();
750
751 for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
752 {
753 switch (head.getProc(j))
754 {
755 case FITS::kFactRaw:
756 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
757 break;
758
759 case FITS::kFactSmoothing:
760 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
761 break;
762
763 case FITS::kFactHuffman16:
764 if (head.getOrdering() == FITS::kOrderByCol)
765 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
766 else
767 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
768 break;
769 }
770 }
771
772 //check if compressed size is larger than uncompressed
773 //if so set flag and redo it uncompressed
774 if ((head.getProc(0) != FITS::kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
775 {
776 //de-smooth !
777 if (head.getProc(0) == FITS::kFactSmoothing)
778 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
779
780 FITS::Compression he;
781
782 compressedOffset = previousOffset + he.getSizeOnDisk();
783 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
784
785 he.SetBlockSize(compressedOffset - previousOffset);
786 he.Memcpy(dest+previousOffset);
787
788 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
789
790 catalog_row[i].first = compressedOffset - catalog_row[i].second;
791 continue;
792 }
793
794 head.SetBlockSize(compressedOffset - previousOffset);
795 head.Memcpy(dest + previousOffset);
796
797 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
798 catalog_row[i].first = compressedOffset - catalog_row[i].second;
799 }
800
801 const FITS::TileHeader tile_head(thisRoundNumRows, compressedOffset);
802 memcpy(dest, &tile_head, sizeof(FITS::TileHeader));
803
804 return compressedOffset;
805 }
806
807 /// Transpose a tile to a new buffer
808 /// @param src buffer hosting the regular, row-ordered data
809 /// @param dest the target buffer that will receive the transposed data
810 void copyTransposeTile(const char* src, char* dest, uint32_t num_rows)
811 {
812 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
813
814 //copy the tile and transpose it
815 for (uint32_t i=0;i<fRealColumns.size();i++)
816 {
817 switch (fRealColumns[i].block_head.getOrdering())
818 {
819 case FITS::kOrderByRow:
820 //regular, "semi-transposed" copy
821 for (uint32_t k=0;k<thisRoundNumRows;k++)
822 {
823 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
824 dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
825 }
826 break;
827
828 case FITS::kOrderByCol:
829 //transposed copy
830 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
831 for (uint32_t k=0;k<thisRoundNumRows;k++)
832 {
833 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
834 dest += fRealColumns[i].col.size;
835 }
836 break;
837 };
838 }
839 }
840
841 /// Specific compression functions
842 /// @param dest the target buffer
843 /// @param src the source buffer
844 /// @param size number of bytes to copy
845 /// @return number of bytes written
846 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
847 {
848 memcpy(dest, src, size);
849 return size;
850 }
851
852 /// Do huffman encoding
853 /// @param dest the buffer that will receive the compressed data
854 /// @param src the buffer hosting the transposed data
855 /// @param numRows number of rows of data in the transposed buffer
856 /// @param sizeOfElems size in bytes of one data elements
857 /// @param numRowElems number of elements on each row
858 /// @return number of bytes written
859 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
860 {
861 std::string huffmanOutput;
862 uint32_t previousHuffmanSize = 0;
863
864 //if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
865 if (numRows < 2)
866 return numRows*sizeOfElems*numRowElems + 1000;
867
868 if (sizeOfElems < 2 )
869 {
870#ifdef __EXCEPTIONS
871 throw std::runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
872#else
873 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << std::endl;
874 return 0;
875#endif
876 }
877
878 uint32_t huffmanOffset = 0;
879 for (uint32_t j=0;j<numRowElems;j++)
880 {
881 Huffman::Encode(huffmanOutput,
882 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
883 numRows*(sizeOfElems/2));
884 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
885 huffmanOffset += sizeof(uint32_t);
886 previousHuffmanSize = huffmanOutput.size();
887 }
888
889 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
890
891 //only copy if not larger than not-compressed size
892 if (totalSize < numRows*sizeOfElems*numRowElems)
893 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
894
895 return totalSize;
896 }
897
898 /// Applies Thomas' DRS4 smoothing
899 /// @param data where to apply it
900 /// @param numElems how many elements of type int16_t are stored in the buffer
901 /// @return number of bytes modified
902 uint32_t applySMOOTHING(char* data, uint32_t numElems)
903 {
904 int16_t* short_data = reinterpret_cast<int16_t*>(data);
905 for (int j=numElems-1;j>1;j--)
906 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
907
908 return numElems*sizeof(int16_t);
909 }
910
911 /// Apply the inverse transform of the integer smoothing
912 /// @param data where to apply it
913 /// @param numElems how many elements of type int16_t are stored in the buffer
914 /// @return number of bytes modified
915 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
916 {
917 int16_t* short_data = reinterpret_cast<int16_t*>(data);
918 for (uint32_t j=2;j<numElems;j++)
919 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
920
921 return numElems*sizeof(uint16_t);
922 }
923
924
925
926 //thread related stuff
927 MemoryManager fMemPool; ///< Actual memory manager, providing memory for the compression buffers
928 int32_t fNumQueues; ///< Current number of threads that will be used by this object
929 uint64_t fMaxUsableMem; ///< Maximum number of bytes that can be allocated by the memory manager
930 int32_t fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads)
931
932 std::vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads)
933 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///< Writing queue (=thread)
934
935 // catalog related stuff
936 CatalogType fCatalog; ///< Catalog for this file
937 uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists)
938 uint32_t fNumTiles; ///< Number of pre-reserved tiles
939 uint32_t fNumRowsPerTile; ///< Number of rows per tile
940 off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file
941
942 // checksum related stuff
943 Checksum fCatalogSum; ///< Checksum of the catalog
944 Checksum fRawSum; ///< Raw sum (specific to FACT)
945 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
946
947 // data layout related stuff
948 /// Regular columns augmented with compression informations
949 struct CompressedColumn
950 {
951 CompressedColumn(const Table::Column& c, const FITS::Compression& h) : col(c),
952 block_head(h)
953 {}
954 Table::Column col; ///< the regular column entry
955 FITS::Compression block_head; ///< the compression data associated with that column
956 };
957 std::vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file
958 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row
959 std::shared_ptr<char> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written
960 std::vector<char> fRawSumBuffer; ///< buffer used for checksuming the incoming data, before compression
961
962#ifdef __EXCEPTIONS
963 std::exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads
964#endif
965 int fErrno; ///< propagate errno to main thread
966};
967
968#endif
Note: See TracBrowser for help on using the repository browser.