source: trunk/Mars/mcore/zofits.h@ 17392

Last change on this file since 17392 was 17302, checked in by tbretz, 11 years ago
fRealRowWidth is know already before the header is written, so it can be updated. That has the advantage that the files headers can be read with fitsdump (I hope) even if the file is not yet closed - otherwise it gives an error that the column size mismatches.
File size: 38.7 KB
Line 
1#ifndef FACT_zofits
2#define FACT_zofits
3
4/*
5 * zofits.h
6 *
7 * FACT native compressed FITS writer
8 * Author: lyard
9 */
10
11#include "ofits.h"
12#include "huffman.h"
13#include "Queue.h"
14#include "MemoryManager.h"
15
16#ifdef HAVE_BOOST_THREAD
17#include <boost/thread.hpp>
18#endif
19
20class zofits : public ofits
21{
22#ifdef __MARS__ // Needed by CINT to access the structures
23public:
24#endif
25 /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin
26 template<class S>
27 struct QueueMin : std::list<S>
28 {
29 typename std::list<S>::iterator begin()
30 {
31 return min_element(std::list<S>::begin(), std::list<S>::end());
32 }
33 };
34
35#ifdef __CINT__
36 struct CatalogEntry;
37#else
38 //catalog types
39 struct CatalogEntry
40 {
41 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) { }
42 int64_t first; ///< Size of this column in the tile
43 int64_t second; ///< offset of this column in the tile, from the start of the heap area
44 } __attribute__((__packed__));
45#endif
46
47 typedef std::vector<CatalogEntry> CatalogRow;
48 typedef std::list<CatalogRow> CatalogType;
49
50 /// Parameters required to write a tile to disk
51 struct WriteTarget
52 {
53 bool operator < (const WriteTarget& other) const
54 {
55 return tile_num < other.tile_num;
56 }
57
58 WriteTarget() { }
59 WriteTarget(const WriteTarget &t, uint32_t sz) : tile_num(t.tile_num), size(sz), data(t.data) { }
60
61 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
62 uint32_t size; ///< Size to write
63 std::shared_ptr<char> data; ///< Memory block to write
64 };
65
66
67 /// Parameters required to compress a tile of data
68 struct CompressionTarget
69 {
70 CompressionTarget(CatalogRow& r) : catalog_entry(r)
71 {}
72
73 CatalogRow& catalog_entry; ///< Reference to the catalog entry to deal with
74 std::shared_ptr<char> src; ///< Original data
75 std::shared_ptr<char> transposed_src; ///< Transposed data
76 WriteTarget target; ///< Compressed data
77 uint32_t num_rows; ///< Number of rows to compress
78 };
79
80public:
81 /// static setter for the default number of threads to use. -1 means all available physical cores
82 static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)<-1) n=_n; return n; }
83 static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; }
84 static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; }
85 static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; }
86
87 /// constructors
88 /// @param numTiles how many data groups should be pre-reserved ?
89 /// @param rowPerTile how many rows will be grouped together in a single tile
90 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
91 zofits(uint32_t numTiles = DefaultMaxNumTiles(),
92 uint32_t rowPerTile = DefaultNumRowsPerTile(),
93 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
94 fMemPool(0, maxUsableMem*1000),
95 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
96 {
97 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
98 SetNumThreads(DefaultNumThreads());
99 }
100
101 /// @param fname the target filename
102 /// @param numTiles how many data groups should be pre-reserved ?
103 /// @param rowPerTile how many rows will be grouped together in a single tile
104 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
105 zofits(const char* fname,
106 uint32_t numTiles = DefaultMaxNumTiles(),
107 uint32_t rowPerTile = DefaultNumRowsPerTile(),
108 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(fname),
109 fMemPool(0, maxUsableMem*1000),
110 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
111 {
112 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
113 SetNumThreads(DefaultNumThreads());
114 }
115
116 //initialization of member variables
117 /// @param nt number of tiles
118 /// @param rpt number of rows per tile
119 /// @param maxUsableMem max amount of RAM to be used by the compression buffers
120 void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
121 {
122 fCheckOffset = 0;
123 fNumQueues = 0;
124
125 fNumTiles = nt==0 ? 1 : nt;
126 fNumRowsPerTile = rpt;
127
128 fRealRowWidth = 0;
129 fCatalogOffset = 0;
130 fCatalogSize = 0;
131
132 fMaxUsableMem = maxUsableMem;
133#ifdef __EXCEPTIONS
134 fThreadsException = std::exception_ptr();
135#endif
136 fErrno = 0;
137 }
138
139 /// write the header of the binary table
140 /// @param name the name of the table to be created
141 /// @return the state of the file
142 virtual bool WriteTableHeader(const char* name="DATA")
143 {
144 reallocateBuffers();
145
146 SetInt("ZNAXIS1", fRealRowWidth);
147
148 ofits::WriteTableHeader(name);
149
150 fCompressionQueues.front().setPromptExecution(fNumQueues==0);
151 fWriteToDiskQueue.setPromptExecution(fNumQueues==0);
152
153 if (fNumQueues != 0)
154 {
155 //start the compression queues
156 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
157 it->start();
158
159 //start the disk writer
160 fWriteToDiskQueue.start();
161 }
162
163 //mark that no tile has been written so far
164 fLatestWrittenTile = -1;
165
166 //no wiring error (in the writing of the data) has occured so far
167 fErrno = 0;
168
169 return good();
170 }
171
172 /// open a new file.
173 /// @param filename the name of the file
174 /// @param Whether or not the name of the extension should be added or not
175 void open(const char* filename, bool addEXTNAMEKey=true)
176 {
177 ofits::open(filename, addEXTNAMEKey);
178
179 //add compression-related header entries
180 SetBool( "ZTABLE", true, "Table is compressed");
181 SetInt( "ZNAXIS1", 0, "Width of uncompressed rows");
182 SetInt( "ZNAXIS2", 0, "Number of uncompressed rows");
183 SetInt( "ZPCOUNT", 0, "");
184 SetInt( "ZHEAPPTR", 0, "");
185 SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
186 SetInt( "THEAP", 0, "");
187 SetStr( "RAWSUM", " 0", "Checksum of raw little endian data");
188 SetFloat("ZRATIO", 0, "Compression ratio");
189 SetInt( "ZSHRINK", 1, "Catalog shrink factor");
190
191 fCatalogSize = 0;
192 fRealRowWidth = 0;
193 fCatalogOffset = 0;
194 fCatalogSize = 0;
195 fCheckOffset = 0;
196
197 fRealColumns.clear();
198 fCatalog.clear();
199 fCatalogSum.reset();
200 fRawSum.reset();
201 }
202
203 /// Super method. does nothing as zofits does not know about DrsOffsets
204 /// @return the state of the file
205 virtual bool WriteDrsOffsetsTable()
206 {
207 return good();
208 }
209
210 /// Returns the number of bytes per uncompressed row
211 /// @return number of bytes per uncompressed row
212 uint32_t GetBytesPerRow() const
213 {
214 return fRealRowWidth;
215 }
216
217 /// Write the data catalog
218 /// @return the state of the file
219 bool WriteCatalog()
220 {
221 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
222 const uint32_t total_catalog_size = fNumTiles*one_catalog_row_size;
223
224 // swap the catalog bytes before writing
225 std::vector<char> swapped_catalog(total_catalog_size);
226
227 uint32_t shift = 0;
228 for (auto it=fCatalog.cbegin(); it!=fCatalog.cend(); it++)
229 {
230 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
231 shift += one_catalog_row_size;
232 }
233
234 if (fCatalogSize < fNumTiles)
235 memset(swapped_catalog.data()+shift, 0, total_catalog_size-shift);
236
237 // first time writing ? remember where we are
238 if (fCatalogOffset == 0)
239 fCatalogOffset = tellp();
240
241 // remember where we came from
242 const off_t where_are_we = tellp();
243
244 // write to disk
245 seekp(fCatalogOffset);
246 write(swapped_catalog.data(), total_catalog_size);
247
248 if (where_are_we != fCatalogOffset)
249 seekp(where_are_we);
250
251 // udpate checksum
252 fCatalogSum.reset();
253 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
254
255 return good();
256 }
257
258 /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets.
259 virtual void DrsOffsetCalibrate(char* )
260 {
261
262 }
263
264 CatalogRow& AddOneCatalogRow()
265 {
266 // add one row to the catalog
267 fCatalog.emplace_back(CatalogRow());
268 fCatalog.back().resize(fTable.num_cols);
269 for (auto it=fCatalog.back().begin(); it != fCatalog.back().end(); it++)
270 *it = CatalogEntry(0,0);
271
272 fCatalogSize++;
273
274 return fCatalog.back();
275 }
276
277 /// write one row of data
278 /// Note, in a multi-threaded environment (NumThreads>0), the return code should be checked rather
279 /// than the badbit() of the stream (it might have been set by a thread before the errno has been set)
280 /// errno will then contain the correct error number of the last error which happened during writing.
281 /// @param ptr the source buffer
282 /// @param the number of bytes to write
283 /// @return the state of the file. WARNING: with multithreading, this will most likely be the state of the file before the data is actually written
284 bool WriteRow(const void* ptr, size_t cnt, bool = true)
285 {
286 if (cnt != fRealRowWidth)
287 {
288#ifdef __EXCEPTIONS
289 throw std::runtime_error("Wrong size of row given to WriteRow");
290#else
291 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << std::endl;
292 return false;
293#endif
294 }
295
296#ifdef __EXCEPTIONS
297 //check if something hapenned while the compression threads were working
298 //if so, re-throw the exception that was generated
299 if (fThreadsException != std::exception_ptr())
300 std::rethrow_exception(fThreadsException);
301#endif
302
303 //copy current row to pool or rows waiting for compression
304 char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
305 memcpy(target_location, ptr, fRealRowWidth);
306
307 //for now, make an extra copy of the data, for RAWSUM checksuming.
308 //Ideally this should be moved to the threads
309 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
310 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
311 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
312 char* buffer = fRawSumBuffer.data() + rawOffset;
313 auto ib = fRawSumBuffer.begin();
314 auto ie = fRawSumBuffer.rbegin();
315 *ib++ = 0;
316 *ib++ = 0;
317 *ib++ = 0;
318 *ib = 0;
319
320 *ie++ = 0;
321 *ie++ = 0;
322 *ie++ = 0;
323 *ie = 0;
324
325 memcpy(buffer, ptr, fRealRowWidth);
326
327 fRawSum.add(fRawSumBuffer, false);
328
329 fTable.num_rows++;
330
331 if (fTable.num_rows % fNumRowsPerTile != 0)
332 {
333 errno = fErrno;
334 return errno==0;
335 }
336
337 // use the least occupied queue
338 const auto imin = std::min_element(fCompressionQueues.begin(), fCompressionQueues.end());
339
340 if (!imin->emplace(InitNextCompression()))
341 {
342#ifdef __EXCEPTIONS
343 throw std::runtime_error("The compression queues are not started. Did you close the file before writing this row?");
344#else
345 gLog << ___err___ << "The compression queues are not started. Did you close the file before writing this row?" << std::endl;
346 errno = 0;
347 return false;
348#endif
349 }
350
351 errno = fErrno;
352 return errno==0;
353 }
354
355 /// update the real number of rows
356 void FlushNumRows()
357 {
358 SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile);
359 SetInt("ZNAXIS2", fTable.num_rows);
360 FlushHeader();
361 }
362
363 /// Setup the environment to compress yet another tile of data
364 /// @param target the struct where to host the produced parameters
365 CompressionTarget InitNextCompression()
366 {
367 CompressionTarget target(AddOneCatalogRow());
368
369 //fill up compression target
370 target.src = fSmartBuffer;
371 target.transposed_src = fMemPool.malloc();
372 target.num_rows = fTable.num_rows;
373
374 //fill up write to disk target
375 WriteTarget &write_target = target.target;
376 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
377 write_target.size = 0;
378 write_target.data = fMemPool.malloc();
379
380 //get a new buffer to host the incoming data
381 fSmartBuffer = fMemPool.malloc();
382
383 return target;
384 }
385
386 /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file.
387 uint32_t ShrinkCatalog()
388 {
389 //add empty row to get either the target number of rows, or a multiple of the allowed size
390 for (uint32_t i=0;i<fCatalogSize%fNumTiles;i++)
391 AddOneCatalogRow();
392
393 //did we write more rows than what the catalog could host ?
394 if (fCatalogSize <= fNumTiles) // nothing to do
395 return 1;
396
397 //always exact as extra rows were added just above
398 const uint32_t shrink_factor = fCatalogSize / fNumTiles;
399
400 //shrink the catalog !
401 uint32_t entry_id = 1;
402 auto it = fCatalog.begin();
403 it++;
404 for (; it != fCatalog.end(); it++)
405 {
406 if (entry_id >= fNumTiles)
407 break;
408
409 const uint32_t target_id = entry_id*shrink_factor;
410
411 auto jt = it;
412 for (uint32_t i=0; i<target_id-entry_id; i++)
413 jt++;
414
415 *it = *jt;
416
417 entry_id++;
418 }
419
420 const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
421
422 //remove the too many entries
423 for (uint32_t i=0;i<num_tiles_to_remove;i++)
424 {
425 fCatalog.pop_back();
426 fCatalogSize--;
427 }
428
429 //update header keywords
430 fNumRowsPerTile *= shrink_factor;
431
432 SetInt("ZTILELEN", fNumRowsPerTile);
433 SetInt("ZSHRINK", shrink_factor);
434
435 return shrink_factor;
436 }
437
438 /// close an open file.
439 /// @return the state of the file
440 bool close()
441 {
442 // stop compression and write threads
443 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
444 it->wait();
445
446 fWriteToDiskQueue.wait();
447
448 if (tellp() < 0)
449 return false;
450
451#ifdef __EXCEPTIONS
452 //check if something hapenned while the compression threads were working
453 //if so, re-throw the exception that was generated
454 if (fThreadsException != std::exception_ptr())
455 std::rethrow_exception(fThreadsException);
456#endif
457
458 //write the last tile of data (if any)
459 if (fErrno==0 && fTable.num_rows%fNumRowsPerTile!=0)
460 {
461 fWriteToDiskQueue.enablePromptExecution();
462 fCompressionQueues.front().enablePromptExecution();
463 fCompressionQueues.front().emplace(InitNextCompression());
464 }
465
466 AlignTo2880Bytes();
467
468 int64_t heap_size = 0;
469 int64_t compressed_offset = 0;
470 for (auto it=fCatalog.begin(); it!= fCatalog.end(); it++)
471 {
472 compressed_offset += sizeof(FITS::TileHeader);
473 heap_size += sizeof(FITS::TileHeader);
474 for (uint32_t j=0; j<it->size(); j++)
475 {
476 heap_size += (*it)[j].first;
477 (*it)[j].second = compressed_offset;
478 compressed_offset += (*it)[j].first;
479 if ((*it)[j].first == 0)
480 (*it)[j].second = 0;
481 }
482 }
483
484 const uint32_t shrink_factor = ShrinkCatalog();
485
486 //update header keywords
487 SetInt("ZNAXIS1", fRealRowWidth);
488 SetInt("ZNAXIS2", fTable.num_rows);
489
490 SetInt("ZHEAPPTR", fCatalogSize*fTable.num_cols*sizeof(uint64_t)*2);
491
492 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
493 const uint32_t total_catalog_width = 2*sizeof(int64_t)*fTable.num_cols;
494
495 SetInt("THEAP", total_num_tiles_written*total_catalog_width);
496 SetInt("NAXIS1", total_catalog_width);
497 SetInt("NAXIS2", total_num_tiles_written);
498 SetStr("RAWSUM", std::to_string((long long int)(fRawSum.val())));
499
500 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
501 SetFloat("ZRATIO", compression_ratio);
502
503 //add to the heap size the size of the gap between the catalog and the actual heap
504 heap_size += (fCatalogSize - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
505
506 SetInt("PCOUNT", heap_size, "size of special data area");
507
508 //Just for updating the fCatalogSum value
509 WriteCatalog();
510
511 fDataSum += fCatalogSum;
512
513 const Checksum checksm = UpdateHeaderChecksum();
514
515 std::ofstream::close();
516
517 fSmartBuffer = std::shared_ptr<char>();
518
519 //restore the number of rows per tile in case the catalog has been shrinked
520 if (shrink_factor != 1)
521 fNumRowsPerTile /= shrink_factor;
522
523 if ((checksm+fDataSum).valid())
524 return true;
525
526 std::ostringstream sout;
527 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
528#ifdef __EXCEPTIONS
529 throw std::runtime_error(sout.str());
530#else
531 gLog << ___err___ << "ERROR - " << sout.str() << std::endl;
532 return false;
533#endif
534 }
535
536 /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
537 bool AddColumn(uint32_t cnt, char typechar, const std::string& name, const std::string& unit,
538 const std::string& comment="", bool addHeaderKeys=true)
539 {
540 return AddColumn(FITS::kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
541 }
542
543 /// Overload of the simplified compressed version
544 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const std::string& name,
545 const std::string& unit, const std::string& comment="", bool addHeaderKeys=true)
546 {
547 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
548 return false;
549
550 const size_t size = SizeFromType(typechar);
551
552 Table::Column col;
553 col.name = name;
554 col.type = typechar;
555 col.num = cnt;
556 col.size = size;
557 col.offset = fRealRowWidth;
558
559 fRealRowWidth += size*cnt;
560
561 fRealColumns.emplace_back(col, comp);
562
563 SetStr("ZFORM"+std::to_string((long long int)(fRealColumns.size())), std::to_string((long long int)(cnt))+typechar, "format of "+name+" "+CommentFromType(typechar));
564 SetStr("ZCTYP"+std::to_string((long long int)(fRealColumns.size())), "FACT", "Compression type: FACT");
565
566 return true;
567 }
568
569 /// Get and set the actual number of threads for this object
570 int32_t GetNumThreads() const { return fNumQueues; }
571 bool SetNumThreads(uint32_t num)
572 {
573 if (tellp()>0)
574 {
575#ifdef __EXCEPTIONS
576 throw std::runtime_error("Number of threads cannot be changed in the middle of writing a file");
577#else
578 gLog << ___err___ << "ERROR - Number of threads cannot be changed in the middle of writing a file" << std::endl;
579#endif
580 return false;
581 }
582
583 //get number of physically available threads
584#ifdef HAVE_BOOST_THREAD
585 unsigned int num_available_cores = boost::thread::hardware_concurrency();
586#else
587 unsigned int num_available_cores = std::thread::hardware_concurrency();
588#endif
589 // could not detect number of available cores from system properties...
590 if (num_available_cores == 0)
591 num_available_cores = 1;
592
593 // leave one core for the main thread and one for the writing
594 if (num > num_available_cores)
595 num = num_available_cores>2 ? num_available_cores-2 : 1;
596
597 fCompressionQueues.resize(num<1?1:num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false));
598 fNumQueues = num;
599
600 return true;
601 }
602
603 uint32_t GetNumTiles() const { return fNumTiles; }
604 void SetNumTiles(uint32_t num) { fNumTiles=num; }
605
606protected:
607
608 /// Allocates the required objects.
609 void reallocateBuffers()
610 {
611 const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(FITS::BlockHeader) + sizeof(FITS::TileHeader) + 8; //+8 for checksuming;
612 fMemPool.setChunkSize(chunk_size);
613
614 fSmartBuffer = fMemPool.malloc();
615 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
616 }
617
618 /// Actually does the writing to disk (and checksuming)
619 /// @param src the buffer to write
620 /// @param sizeToWrite how many bytes should be written
621 /// @return the state of the file
622 bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite)
623 {
624 char* checkSumPointer = src+4;
625 int32_t extraBytes = 0;
626 uint32_t sizeToChecksum = sizeToWrite;
627
628 //should we extend the array to the left ?
629 if (fCheckOffset != 0)
630 {
631 sizeToChecksum += fCheckOffset;
632 checkSumPointer -= fCheckOffset;
633 memset(checkSumPointer, 0, fCheckOffset);
634 }
635
636 //should we extend the array to the right ?
637 if (sizeToChecksum%4 != 0)
638 {
639 extraBytes = 4 - (sizeToChecksum%4);
640 memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
641 sizeToChecksum += extraBytes;
642 }
643
644 //do the checksum
645 fDataSum.add(checkSumPointer, sizeToChecksum);
646
647 fCheckOffset = (4 - extraBytes)%4;
648
649 //write data to disk
650 write(src+4, sizeToWrite);
651
652 return good();
653 }
654
655 /// Compress a given buffer based on the target. This is the method executed by the threads
656 /// @param target the struct hosting the parameters of the compression
657 /// @return number of bytes of the compressed data, or always 1 when used by the Queues
658 bool CompressBuffer(const CompressionTarget& target)
659 {
660 //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
661 //Really do not understand what's wrong...
662 //calibrate data if required
663 const uint32_t thisRoundNumRows = (target.num_rows%fNumRowsPerTile) ? target.num_rows%fNumRowsPerTile : fNumRowsPerTile;
664 for (uint32_t i=0;i<thisRoundNumRows;i++)
665 {
666 char* target_location = target.src.get() + fRealRowWidth*i;
667 DrsOffsetCalibrate(target_location);
668 }
669#ifdef __EXCEPTIONS
670 try
671 {
672#endif
673 //transpose the original data
674 copyTransposeTile(target.src.get(), target.transposed_src.get(), target.num_rows);
675
676 //compress the buffer
677 const uint64_t compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
678
679 //post the result to the writing queue
680 //get a copy so that it becomes non-const
681 fWriteToDiskQueue.emplace(target.target, compressed_size);
682
683#ifdef __EXCEPTIONS
684 }
685 catch (...)
686 {
687 fThreadsException = std::current_exception();
688 if (fNumQueues == 0)
689 std::rethrow_exception(fThreadsException);
690 }
691#endif
692
693 return true;
694 }
695
696 /// Write one compressed tile to disk. This is the method executed by the writing thread
697 /// @param target the struct hosting the write parameters
698 bool WriteBufferToDisk(const WriteTarget& target)
699 {
700 //is this the tile we're supposed to write ?
701 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
702 return false;
703
704 fLatestWrittenTile++;
705
706#ifdef __EXCEPTIONS
707 try
708 {
709#endif
710 //could not write the data to disk
711 if (!writeCompressedDataToDisk(target.data.get(), target.size))
712 fErrno = errno;
713#ifdef __EXCEPTIONS
714 }
715 catch (...)
716 {
717 fThreadsException = std::current_exception();
718 if (fNumQueues == 0)
719 std::rethrow_exception(fThreadsException);
720 }
721#endif
722 return true;
723 }
724
725 /// Compress a given buffer based on its source and destination
726 //src cannot be const, as applySMOOTHING is done in place
727 /// @param dest the buffer hosting the compressed data
728 /// @param src the buffer hosting the transposed data
729 /// @param num_rows the number of uncompressed rows in the transposed buffer
730 /// @param the number of bytes of the compressed data
731 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows, CatalogRow& catalog_row)
732 {
733 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
734 uint32_t offset = 0;
735
736 //skip the checksum reserved area
737 dest += 4;
738
739 //skip the 'TILE' marker and tile size entry
740 uint64_t compressedOffset = sizeof(FITS::TileHeader);
741
742 //now compress each column one by one by calling compression on arrays
743 for (uint32_t i=0;i<fRealColumns.size();i++)
744 {
745 catalog_row[i].second = compressedOffset;
746
747 if (fRealColumns[i].col.num == 0)
748 continue;
749
750 FITS::Compression& head = fRealColumns[i].block_head;
751
752 //set the default byte telling if uncompressed the compressed Flag
753 const uint64_t previousOffset = compressedOffset;
754
755 //skip header data
756 compressedOffset += head.getSizeOnDisk();
757
758 for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
759 {
760 switch (head.getProc(j))
761 {
762 case FITS::kFactRaw:
763 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
764 break;
765
766 case FITS::kFactSmoothing:
767 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
768 break;
769
770 case FITS::kFactHuffman16:
771 if (head.getOrdering() == FITS::kOrderByCol)
772 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
773 else
774 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
775 break;
776 }
777 }
778
779 //check if compressed size is larger than uncompressed
780 //if so set flag and redo it uncompressed
781 if ((head.getProc(0) != FITS::kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
782 {
783 //de-smooth !
784 if (head.getProc(0) == FITS::kFactSmoothing)
785 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
786
787 FITS::Compression he;
788
789 compressedOffset = previousOffset + he.getSizeOnDisk();
790 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
791
792 he.SetBlockSize(compressedOffset - previousOffset);
793 he.Memcpy(dest+previousOffset);
794
795 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
796
797 catalog_row[i].first = compressedOffset - catalog_row[i].second;
798 continue;
799 }
800
801 head.SetBlockSize(compressedOffset - previousOffset);
802 head.Memcpy(dest + previousOffset);
803
804 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
805 catalog_row[i].first = compressedOffset - catalog_row[i].second;
806 }
807
808 const FITS::TileHeader tile_head(thisRoundNumRows, compressedOffset);
809 memcpy(dest, &tile_head, sizeof(FITS::TileHeader));
810
811 return compressedOffset;
812 }
813
814 /// Transpose a tile to a new buffer
815 /// @param src buffer hosting the regular, row-ordered data
816 /// @param dest the target buffer that will receive the transposed data
817 void copyTransposeTile(const char* src, char* dest, uint32_t num_rows)
818 {
819 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
820
821 //copy the tile and transpose it
822 for (uint32_t i=0;i<fRealColumns.size();i++)
823 {
824 switch (fRealColumns[i].block_head.getOrdering())
825 {
826 case FITS::kOrderByRow:
827 //regular, "semi-transposed" copy
828 for (uint32_t k=0;k<thisRoundNumRows;k++)
829 {
830 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
831 dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
832 }
833 break;
834
835 case FITS::kOrderByCol:
836 //transposed copy
837 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
838 for (uint32_t k=0;k<thisRoundNumRows;k++)
839 {
840 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
841 dest += fRealColumns[i].col.size;
842 }
843 break;
844 };
845 }
846 }
847
848 /// Specific compression functions
849 /// @param dest the target buffer
850 /// @param src the source buffer
851 /// @param size number of bytes to copy
852 /// @return number of bytes written
853 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
854 {
855 memcpy(dest, src, size);
856 return size;
857 }
858
859 /// Do huffman encoding
860 /// @param dest the buffer that will receive the compressed data
861 /// @param src the buffer hosting the transposed data
862 /// @param numRows number of rows of data in the transposed buffer
863 /// @param sizeOfElems size in bytes of one data elements
864 /// @param numRowElems number of elements on each row
865 /// @return number of bytes written
866 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
867 {
868 std::string huffmanOutput;
869 uint32_t previousHuffmanSize = 0;
870
871 //if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
872 if (numRows < 2)
873 return numRows*sizeOfElems*numRowElems + 1000;
874
875 if (sizeOfElems < 2 )
876 {
877#ifdef __EXCEPTIONS
878 throw std::runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
879#else
880 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << std::endl;
881 return 0;
882#endif
883 }
884
885 uint32_t huffmanOffset = 0;
886 for (uint32_t j=0;j<numRowElems;j++)
887 {
888 Huffman::Encode(huffmanOutput,
889 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
890 numRows*(sizeOfElems/2));
891 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
892 huffmanOffset += sizeof(uint32_t);
893 previousHuffmanSize = huffmanOutput.size();
894 }
895
896 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
897
898 //only copy if not larger than not-compressed size
899 if (totalSize < numRows*sizeOfElems*numRowElems)
900 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
901
902 return totalSize;
903 }
904
905 /// Applies Thomas' DRS4 smoothing
906 /// @param data where to apply it
907 /// @param numElems how many elements of type int16_t are stored in the buffer
908 /// @return number of bytes modified
909 uint32_t applySMOOTHING(char* data, uint32_t numElems)
910 {
911 int16_t* short_data = reinterpret_cast<int16_t*>(data);
912 for (int j=numElems-1;j>1;j--)
913 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
914
915 return numElems*sizeof(int16_t);
916 }
917
918 /// Apply the inverse transform of the integer smoothing
919 /// @param data where to apply it
920 /// @param numElems how many elements of type int16_t are stored in the buffer
921 /// @return number of bytes modified
922 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
923 {
924 int16_t* short_data = reinterpret_cast<int16_t*>(data);
925 for (uint32_t j=2;j<numElems;j++)
926 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
927
928 return numElems*sizeof(uint16_t);
929 }
930
931
932
933 //thread related stuff
934 MemoryManager fMemPool; ///< Actual memory manager, providing memory for the compression buffers
935 int32_t fNumQueues; ///< Current number of threads that will be used by this object
936 uint64_t fMaxUsableMem; ///< Maximum number of bytes that can be allocated by the memory manager
937 int32_t fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads)
938
939 std::vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads)
940 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///< Writing queue (=thread)
941
942 // catalog related stuff
943 CatalogType fCatalog; ///< Catalog for this file
944 uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists)
945 uint32_t fNumTiles; ///< Number of pre-reserved tiles
946 uint32_t fNumRowsPerTile; ///< Number of rows per tile
947 off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file
948
949 // checksum related stuff
950 Checksum fCatalogSum; ///< Checksum of the catalog
951 Checksum fRawSum; ///< Raw sum (specific to FACT)
952 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
953
954 // data layout related stuff
955 /// Regular columns augmented with compression informations
956 struct CompressedColumn
957 {
958 CompressedColumn(const Table::Column& c, const FITS::Compression& h) : col(c),
959 block_head(h)
960 {}
961#ifdef __MARS__ // Needed for the compilation ofthe dictionary
962 CompressedColumn() { }
963#endif
964 Table::Column col; ///< the regular column entry
965 FITS::Compression block_head; ///< the compression data associated with that column
966 };
967 std::vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file
968 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row
969 std::shared_ptr<char> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written
970 std::vector<char> fRawSumBuffer; ///< buffer used for checksuming the incoming data, before compression
971
972#ifdef __EXCEPTIONS
973 std::exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads
974#endif
975 int fErrno; ///< propagate errno to main thread
976};
977
978#endif
Note: See TracBrowser for help on using the repository browser.