source: trunk/Mars/mcore/zofits.h@ 17749

Last change on this file since 17749 was 17677, checked in by tbretz, 11 years ago
Fixed the last fix... using a variable type which allows hundreds of terabytes to be allocated is for sure no option. Actually, restricting the amount of memory which can be allocated is a feature not a problem. The correct way to fix the problem is to properly cast the variable before it is converted from kB to Byte.
File size: 38.7 KB
Line 
1#ifndef FACT_zofits
2#define FACT_zofits
3
4/*
5 * zofits.h
6 *
7 * FACT native compressed FITS writer
8 * Author: lyard
9 */
10
11#include "ofits.h"
12#include "huffman.h"
13#include "Queue.h"
14#include "MemoryManager.h"
15
16#ifdef HAVE_BOOST_THREAD
17#include <boost/thread.hpp>
18#endif
19
20class zofits : public ofits
21{
22#ifdef __MARS__ // Needed by CINT to access the structures
23public:
24#endif
25 /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin
26 template<class S>
27 struct QueueMin : std::list<S>
28 {
29 typename std::list<S>::iterator begin()
30 {
31 return min_element(std::list<S>::begin(), std::list<S>::end());
32 }
33 };
34
35#ifdef __CINT__
36 struct CatalogEntry;
37#else
38 //catalog types
39 struct CatalogEntry
40 {
41 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) { }
42 int64_t first; ///< Size of this column in the tile
43 int64_t second; ///< offset of this column in the tile, from the start of the heap area
44 } __attribute__((__packed__));
45#endif
46
47 typedef std::vector<CatalogEntry> CatalogRow;
48 typedef std::list<CatalogRow> CatalogType;
49
50 /// Parameters required to write a tile to disk
51 struct WriteTarget
52 {
53 bool operator < (const WriteTarget& other) const
54 {
55 return tile_num < other.tile_num;
56 }
57
58 WriteTarget() { }
59 WriteTarget(const WriteTarget &t, uint32_t sz) : tile_num(t.tile_num), size(sz), data(t.data) { }
60
61 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
62 uint32_t size; ///< Size to write
63 std::shared_ptr<char> data; ///< Memory block to write
64 };
65
66
67 /// Parameters required to compress a tile of data
68 struct CompressionTarget
69 {
70 CompressionTarget(CatalogRow& r) : catalog_entry(r)
71 {}
72
73 CatalogRow& catalog_entry; ///< Reference to the catalog entry to deal with
74 std::shared_ptr<char> src; ///< Original data
75 std::shared_ptr<char> transposed_src; ///< Transposed data
76 WriteTarget target; ///< Compressed data
77 uint32_t num_rows; ///< Number of rows to compress
78 };
79
80public:
81 /// static setter for the default number of threads to use. -1 means all available physical cores
82 static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)<-1) n=_n; return n; }
83 static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; }
84 static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; }
85 static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; }
86
87 /// constructors
88 /// @param numTiles how many data groups should be pre-reserved ?
89 /// @param rowPerTile how many rows will be grouped together in a single tile
90 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
91 zofits(uint32_t numTiles = DefaultMaxNumTiles(),
92 uint32_t rowPerTile = DefaultNumRowsPerTile(),
93 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
94 fMemPool(0, size_t(maxUsableMem)*1000),
95 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
96 {
97 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
98 SetNumThreads(DefaultNumThreads());
99 }
100
101 /// @param fname the target filename
102 /// @param numTiles how many data groups should be pre-reserved ?
103 /// @param rowPerTile how many rows will be grouped together in a single tile
104 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
105 zofits(const char* fname,
106 uint32_t numTiles = DefaultMaxNumTiles(),
107 uint32_t rowPerTile = DefaultNumRowsPerTile(),
108 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(fname),
109 fMemPool(0, maxUsableMem*1000),
110 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
111 {
112 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
113 SetNumThreads(DefaultNumThreads());
114 }
115
116 //initialization of member variables
117 /// @param nt number of tiles
118 /// @param rpt number of rows per tile
119 /// @param maxUsableMem max amount of RAM to be used by the compression buffers
120 void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
121 {
122 fCheckOffset = 0;
123 fNumQueues = 0;
124
125 fNumTiles = nt==0 ? 1 : nt;
126 fNumRowsPerTile = rpt;
127
128 fRealRowWidth = 0;
129 fCatalogOffset = 0;
130 fCatalogSize = 0;
131
132 fMaxUsableMem = maxUsableMem;
133#ifdef __EXCEPTIONS
134 fThreadsException = std::exception_ptr();
135#endif
136 fErrno = 0;
137 }
138
139 /// write the header of the binary table
140 /// @param name the name of the table to be created
141 /// @return the state of the file
142 virtual bool WriteTableHeader(const char* name="DATA")
143 {
144 reallocateBuffers();
145
146 SetInt("ZNAXIS1", fRealRowWidth);
147
148 ofits::WriteTableHeader(name);
149
150 fCompressionQueues.front().setPromptExecution(fNumQueues==0);
151 fWriteToDiskQueue.setPromptExecution(fNumQueues==0);
152
153 if (fNumQueues != 0)
154 {
155 //start the compression queues
156 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
157 it->start();
158
159 //start the disk writer
160 fWriteToDiskQueue.start();
161 }
162
163 //mark that no tile has been written so far
164 fLatestWrittenTile = -1;
165
166 //no wiring error (in the writing of the data) has occured so far
167 fErrno = 0;
168
169 return good();
170 }
171
172 /// open a new file.
173 /// @param filename the name of the file
174 /// @param Whether or not the name of the extension should be added or not
175 void open(const char* filename, bool addEXTNAMEKey=true)
176 {
177 ofits::open(filename, addEXTNAMEKey);
178
179 //add compression-related header entries
180 SetBool( "ZTABLE", true, "Table is compressed");
181 SetInt( "ZNAXIS1", 0, "Width of uncompressed rows");
182 SetInt( "ZNAXIS2", 0, "Number of uncompressed rows");
183 SetInt( "ZPCOUNT", 0, "");
184 SetInt( "ZHEAPPTR", 0, "");
185 SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
186 SetInt( "THEAP", 0, "");
187 SetStr( "RAWSUM", " 0", "Checksum of raw little endian data");
188 SetFloat("ZRATIO", 0, "Compression ratio");
189 SetInt( "ZSHRINK", 1, "Catalog shrink factor");
190
191 fCatalogSize = 0;
192 fRealRowWidth = 0;
193 fCatalogOffset = 0;
194 fCatalogSize = 0;
195 fCheckOffset = 0;
196
197 fRealColumns.clear();
198 fCatalog.clear();
199 fCatalogSum.reset();
200 fRawSum.reset();
201 }
202
203 /// Super method. does nothing as zofits does not know about DrsOffsets
204 /// @return the state of the file
205 virtual bool WriteDrsOffsetsTable()
206 {
207 return good();
208 }
209
210 /// Returns the number of bytes per uncompressed row
211 /// @return number of bytes per uncompressed row
212 uint32_t GetBytesPerRow() const
213 {
214 return fRealRowWidth;
215 }
216
217 /// Write the data catalog
218 /// @return the state of the file
219 bool WriteCatalog()
220 {
221 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
222 const uint32_t total_catalog_size = fNumTiles*one_catalog_row_size;
223
224 // swap the catalog bytes before writing
225 std::vector<char> swapped_catalog(total_catalog_size);
226
227 uint32_t shift = 0;
228 for (auto it=fCatalog.cbegin(); it!=fCatalog.cend(); it++)
229 {
230 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
231 shift += one_catalog_row_size;
232 }
233
234 if (fCatalogSize < fNumTiles)
235 memset(swapped_catalog.data()+shift, 0, total_catalog_size-shift);
236
237 // first time writing ? remember where we are
238 if (fCatalogOffset == 0)
239 fCatalogOffset = tellp();
240
241 // remember where we came from
242 const off_t where_are_we = tellp();
243
244 // write to disk
245 seekp(fCatalogOffset);
246 write(swapped_catalog.data(), total_catalog_size);
247
248 if (where_are_we != fCatalogOffset)
249 seekp(where_are_we);
250
251 // udpate checksum
252 fCatalogSum.reset();
253 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
254
255 return good();
256 }
257
258 /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets.
259 virtual void DrsOffsetCalibrate(char* )
260 {
261
262 }
263
264 CatalogRow& AddOneCatalogRow()
265 {
266 // add one row to the catalog
267 fCatalog.emplace_back(CatalogRow());
268 fCatalog.back().resize(fTable.num_cols);
269 for (auto it=fCatalog.back().begin(); it != fCatalog.back().end(); it++)
270 *it = CatalogEntry(0,0);
271
272 fCatalogSize++;
273
274 return fCatalog.back();
275 }
276
277 /// write one row of data
278 /// Note, in a multi-threaded environment (NumThreads>0), the return code should be checked rather
279 /// than the badbit() of the stream (it might have been set by a thread before the errno has been set)
280 /// errno will then contain the correct error number of the last error which happened during writing.
281 /// @param ptr the source buffer
282 /// @param the number of bytes to write
283 /// @return the state of the file. WARNING: with multithreading, this will most likely be the state of the file before the data is actually written
284 bool WriteRow(const void* ptr, size_t cnt, bool = true)
285 {
286 if (cnt != fRealRowWidth)
287 {
288#ifdef __EXCEPTIONS
289 throw std::runtime_error("Wrong size of row given to WriteRow");
290#else
291 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << std::endl;
292 return false;
293#endif
294 }
295
296#ifdef __EXCEPTIONS
297 //check if something hapenned while the compression threads were working
298 //if so, re-throw the exception that was generated
299 if (fThreadsException != std::exception_ptr())
300 std::rethrow_exception(fThreadsException);
301#endif
302
303 //copy current row to pool or rows waiting for compression
304 char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
305 memcpy(target_location, ptr, fRealRowWidth);
306
307 //for now, make an extra copy of the data, for RAWSUM checksuming.
308 //Ideally this should be moved to the threads
309 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
310 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
311 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
312 char* buffer = fRawSumBuffer.data() + rawOffset;
313 auto ib = fRawSumBuffer.begin();
314 auto ie = fRawSumBuffer.rbegin();
315 *ib++ = 0;
316 *ib++ = 0;
317 *ib++ = 0;
318 *ib = 0;
319
320 *ie++ = 0;
321 *ie++ = 0;
322 *ie++ = 0;
323 *ie = 0;
324
325 memcpy(buffer, ptr, fRealRowWidth);
326
327 fRawSum.add(fRawSumBuffer, false);
328
329 fTable.num_rows++;
330
331 if (fTable.num_rows % fNumRowsPerTile != 0)
332 {
333 errno = fErrno;
334 return errno==0;
335 }
336
337 // use the least occupied queue
338 const auto imin = std::min_element(fCompressionQueues.begin(), fCompressionQueues.end());
339
340 if (!imin->emplace(InitNextCompression()))
341 {
342#ifdef __EXCEPTIONS
343 throw std::runtime_error("The compression queues are not started. Did you close the file before writing this row?");
344#else
345 gLog << ___err___ << "The compression queues are not started. Did you close the file before writing this row?" << std::endl;
346 errno = 0;
347 return false;
348#endif
349 }
350
351 errno = fErrno;
352 return errno==0;
353 }
354
355 /// update the real number of rows
356 void FlushNumRows()
357 {
358 SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile);
359 SetInt("ZNAXIS2", fTable.num_rows);
360 FlushHeader();
361 }
362
363 /// Setup the environment to compress yet another tile of data
364 /// @param target the struct where to host the produced parameters
365 CompressionTarget InitNextCompression()
366 {
367 CompressionTarget target(AddOneCatalogRow());
368
369 //fill up compression target
370 target.src = fSmartBuffer;
371 target.transposed_src = fMemPool.malloc();
372 target.num_rows = fTable.num_rows;
373
374 //fill up write to disk target
375 WriteTarget &write_target = target.target;
376 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
377 write_target.size = 0;
378 write_target.data = fMemPool.malloc();
379
380 //get a new buffer to host the incoming data
381 fSmartBuffer = fMemPool.malloc();
382
383 return target;
384 }
385
386 /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file.
387 uint32_t ShrinkCatalog()
388 {
389 //add empty row to get either the target number of rows, or a multiple of the allowed size
390 for (uint32_t i=0;i<fCatalogSize%fNumTiles;i++)
391 AddOneCatalogRow();
392
393 //did we write more rows than what the catalog could host ?
394 if (fCatalogSize <= fNumTiles) // nothing to do
395 return 1;
396
397 //always exact as extra rows were added just above
398 const uint32_t shrink_factor = fCatalogSize / fNumTiles;
399
400 //shrink the catalog !
401 uint32_t entry_id = 1;
402 auto it = fCatalog.begin();
403 it++;
404 for (; it != fCatalog.end(); it++)
405 {
406 if (entry_id >= fNumTiles)
407 break;
408
409 const uint32_t target_id = entry_id*shrink_factor;
410
411 auto jt = it;
412 for (uint32_t i=0; i<target_id-entry_id; i++)
413 jt++;
414
415 *it = *jt;
416
417 entry_id++;
418 }
419
420 const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
421
422 //remove the too many entries
423 for (uint32_t i=0;i<num_tiles_to_remove;i++)
424 {
425 fCatalog.pop_back();
426 fCatalogSize--;
427 }
428
429 //update header keywords
430 fNumRowsPerTile *= shrink_factor;
431
432 SetInt("ZTILELEN", fNumRowsPerTile);
433 SetInt("ZSHRINK", shrink_factor);
434
435 return shrink_factor;
436 }
437
438 /// close an open file.
439 /// @return the state of the file
440 bool close()
441 {
442 // stop compression and write threads
443 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
444 it->wait();
445
446 fWriteToDiskQueue.wait();
447
448 if (tellp() < 0)
449 return false;
450
451#ifdef __EXCEPTIONS
452 //check if something hapenned while the compression threads were working
453 //if so, re-throw the exception that was generated
454 if (fThreadsException != std::exception_ptr())
455 std::rethrow_exception(fThreadsException);
456#endif
457
458 //write the last tile of data (if any)
459 if (fErrno==0 && fTable.num_rows%fNumRowsPerTile!=0)
460 {
461 fWriteToDiskQueue.enablePromptExecution();
462 fCompressionQueues.front().enablePromptExecution();
463 fCompressionQueues.front().emplace(InitNextCompression());
464 }
465
466 AlignTo2880Bytes();
467
468 int64_t heap_size = 0;
469 int64_t compressed_offset = 0;
470 for (auto it=fCatalog.begin(); it!= fCatalog.end(); it++)
471 {
472 compressed_offset += sizeof(FITS::TileHeader);
473 heap_size += sizeof(FITS::TileHeader);
474 for (uint32_t j=0; j<it->size(); j++)
475 {
476 heap_size += (*it)[j].first;
477 (*it)[j].second = compressed_offset;
478 compressed_offset += (*it)[j].first;
479 if ((*it)[j].first == 0)
480 (*it)[j].second = 0;
481 }
482 }
483
484 const uint32_t shrink_factor = ShrinkCatalog();
485
486 //update header keywords
487 SetInt("ZNAXIS1", fRealRowWidth);
488 SetInt("ZNAXIS2", fTable.num_rows);
489
490 SetInt("ZHEAPPTR", fCatalogSize*fTable.num_cols*sizeof(uint64_t)*2);
491
492 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
493 const uint32_t total_catalog_width = 2*sizeof(int64_t)*fTable.num_cols;
494
495 SetInt("THEAP", total_num_tiles_written*total_catalog_width);
496 SetInt("NAXIS1", total_catalog_width);
497 SetInt("NAXIS2", total_num_tiles_written);
498 SetStr("RAWSUM", std::to_string((long long int)(fRawSum.val())));
499
500 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
501 SetFloat("ZRATIO", compression_ratio);
502
503 //add to the heap size the size of the gap between the catalog and the actual heap
504 heap_size += (fCatalogSize - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
505
506 SetInt("PCOUNT", heap_size, "size of special data area");
507
508 //Just for updating the fCatalogSum value
509 WriteCatalog();
510
511 fDataSum += fCatalogSum;
512
513 const Checksum checksm = UpdateHeaderChecksum();
514
515 std::ofstream::close();
516
517 fSmartBuffer = std::shared_ptr<char>();
518
519 //restore the number of rows per tile in case the catalog has been shrinked
520 if (shrink_factor != 1)
521 fNumRowsPerTile /= shrink_factor;
522
523 if ((checksm+fDataSum).valid())
524 return true;
525
526 std::ostringstream sout;
527 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
528#ifdef __EXCEPTIONS
529 throw std::runtime_error(sout.str());
530#else
531 gLog << ___err___ << "ERROR - " << sout.str() << std::endl;
532 return false;
533#endif
534 }
535
536 /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
537 bool AddColumn(uint32_t cnt, char typechar, const std::string& name, const std::string& unit,
538 const std::string& comment="", bool addHeaderKeys=true)
539 {
540 return AddColumn(FITS::kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
541 }
542
543 /// Overload of the simplified compressed version
544 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const std::string& name,
545 const std::string& unit, const std::string& comment="", bool addHeaderKeys=true)
546 {
547 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
548 return false;
549
550 const size_t size = SizeFromType(typechar);
551
552 Table::Column col;
553 col.name = name;
554 col.type = typechar;
555 col.num = cnt;
556 col.size = size;
557 col.offset = fRealRowWidth;
558
559 fRealRowWidth += size*cnt;
560
561 fRealColumns.emplace_back(col, comp);
562
563 SetStr("ZFORM"+std::to_string((long long int)(fRealColumns.size())), std::to_string((long long int)(cnt))+typechar, "format of "+name+" "+CommentFromType(typechar));
564 SetStr("ZCTYP"+std::to_string((long long int)(fRealColumns.size())), "FACT", "Compression type: FACT");
565
566 return true;
567 }
568
569 /// Get and set the actual number of threads for this object
570 int32_t GetNumThreads() const { return fNumQueues; }
571 bool SetNumThreads(uint32_t num)
572 {
573 if (tellp()>0)
574 {
575#ifdef __EXCEPTIONS
576 throw std::runtime_error("Number of threads cannot be changed in the middle of writing a file");
577#else
578 gLog << ___err___ << "ERROR - Number of threads cannot be changed in the middle of writing a file" << std::endl;
579#endif
580 return false;
581 }
582
583 //get number of physically available threads
584#ifdef HAVE_BOOST_THREAD
585 unsigned int num_available_cores = boost::thread::hardware_concurrency();
586#else
587 unsigned int num_available_cores = std::thread::hardware_concurrency();
588#endif
589 // could not detect number of available cores from system properties...
590 if (num_available_cores == 0)
591 num_available_cores = 1;
592
593 // leave one core for the main thread and one for the writing
594 if (num > num_available_cores)
595 num = num_available_cores>2 ? num_available_cores-2 : 1;
596
597 fCompressionQueues.resize(num<1?1:num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false));
598 fNumQueues = num;
599
600 return true;
601 }
602
603 uint32_t GetNumTiles() const { return fNumTiles; }
604 void SetNumTiles(uint32_t num) { fNumTiles=num; }
605
606protected:
607
608 /// Allocates the required objects.
609 void reallocateBuffers()
610 {
611 const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(FITS::BlockHeader) + sizeof(FITS::TileHeader) + 8; //+8 for checksuming;
612 fMemPool.setChunkSize(chunk_size);
613
614 fSmartBuffer = fMemPool.malloc();
615 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
616 }
617
618 /// Actually does the writing to disk (and checksuming)
619 /// @param src the buffer to write
620 /// @param sizeToWrite how many bytes should be written
621 /// @return the state of the file
622 bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite)
623 {
624 char* checkSumPointer = src+4;
625 int32_t extraBytes = 0;
626 uint32_t sizeToChecksum = sizeToWrite;
627
628 //should we extend the array to the left ?
629 if (fCheckOffset != 0)
630 {
631 sizeToChecksum += fCheckOffset;
632 checkSumPointer -= fCheckOffset;
633 memset(checkSumPointer, 0, fCheckOffset);
634 }
635
636 //should we extend the array to the right ?
637 if (sizeToChecksum%4 != 0)
638 {
639 extraBytes = 4 - (sizeToChecksum%4);
640 memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
641 sizeToChecksum += extraBytes;
642 }
643
644 //do the checksum
645 fDataSum.add(checkSumPointer, sizeToChecksum);
646
647 fCheckOffset = (4 - extraBytes)%4;
648
649 //write data to disk
650 write(src+4, sizeToWrite);
651
652 return good();
653 }
654
655 /// Compress a given buffer based on the target. This is the method executed by the threads
656 /// @param target the struct hosting the parameters of the compression
657 /// @return number of bytes of the compressed data, or always 1 when used by the Queues
658 bool CompressBuffer(const CompressionTarget& target)
659 {
660 //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
661 //Really do not understand what's wrong...
662 //calibrate data if required
663 const uint32_t thisRoundNumRows = (target.num_rows%fNumRowsPerTile) ? target.num_rows%fNumRowsPerTile : fNumRowsPerTile;
664 for (uint32_t i=0;i<thisRoundNumRows;i++)
665 {
666 char* target_location = target.src.get() + fRealRowWidth*i;
667 DrsOffsetCalibrate(target_location);
668 }
669#ifdef __EXCEPTIONS
670 try
671 {
672#endif
673 //transpose the original data
674 copyTransposeTile(target.src.get(), target.transposed_src.get(), target.num_rows);
675
676 //compress the buffer
677 const uint64_t compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
678
679 //post the result to the writing queue
680 //get a copy so that it becomes non-const
681 fWriteToDiskQueue.emplace(target.target, compressed_size);
682
683#ifdef __EXCEPTIONS
684 }
685 catch (...)
686 {
687 fThreadsException = std::current_exception();
688 if (fNumQueues == 0)
689 std::rethrow_exception(fThreadsException);
690 }
691#endif
692
693 return true;
694 }
695
696 /// Write one compressed tile to disk. This is the method executed by the writing thread
697 /// @param target the struct hosting the write parameters
698 bool WriteBufferToDisk(const WriteTarget& target)
699 {
700 //is this the tile we're supposed to write ?
701 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
702 return false;
703
704 fLatestWrittenTile++;
705
706#ifdef __EXCEPTIONS
707 try
708 {
709#endif
710 //could not write the data to disk
711 if (!writeCompressedDataToDisk(target.data.get(), target.size))
712 fErrno = errno;
713#ifdef __EXCEPTIONS
714 }
715 catch (...)
716 {
717 fThreadsException = std::current_exception();
718 if (fNumQueues == 0)
719 std::rethrow_exception(fThreadsException);
720 }
721#endif
722 return true;
723 }
724
725 /// Compress a given buffer based on its source and destination
726 //src cannot be const, as applySMOOTHING is done in place
727 /// @param dest the buffer hosting the compressed data
728 /// @param src the buffer hosting the transposed data
729 /// @param num_rows the number of uncompressed rows in the transposed buffer
730 /// @param the number of bytes of the compressed data
731 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows, CatalogRow& catalog_row)
732 {
733 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
734 uint32_t offset = 0;
735
736 //skip the checksum reserved area
737 dest += 4;
738
739 //skip the 'TILE' marker and tile size entry
740 uint64_t compressedOffset = sizeof(FITS::TileHeader);
741
742 //now compress each column one by one by calling compression on arrays
743 for (uint32_t i=0;i<fRealColumns.size();i++)
744 {
745 catalog_row[i].second = compressedOffset;
746
747 if (fRealColumns[i].col.num == 0)
748 continue;
749
750 FITS::Compression& head = fRealColumns[i].block_head;
751
752 //set the default byte telling if uncompressed the compressed Flag
753 const uint64_t previousOffset = compressedOffset;
754
755 //skip header data
756 compressedOffset += head.getSizeOnDisk();
757
758 for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
759 {
760 switch (head.getProc(j))
761 {
762 case FITS::kFactRaw:
763 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
764 break;
765
766 case FITS::kFactSmoothing:
767 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
768 break;
769
770 case FITS::kFactHuffman16:
771 if (head.getOrdering() == FITS::kOrderByCol)
772 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
773 else
774 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
775 break;
776 }
777 }
778
779 //check if compressed size is larger than uncompressed
780 //if so set flag and redo it uncompressed
781 if ((head.getProc(0) != FITS::kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
782 {
783 //de-smooth !
784 if (head.getProc(0) == FITS::kFactSmoothing)
785 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
786
787 FITS::Compression he;
788
789 compressedOffset = previousOffset + he.getSizeOnDisk();
790 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
791
792 he.SetBlockSize(compressedOffset - previousOffset);
793 he.Memcpy(dest+previousOffset);
794
795 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
796
797 catalog_row[i].first = compressedOffset - catalog_row[i].second;
798 continue;
799 }
800
801 head.SetBlockSize(compressedOffset - previousOffset);
802 head.Memcpy(dest + previousOffset);
803
804 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
805 catalog_row[i].first = compressedOffset - catalog_row[i].second;
806 }
807
808 const FITS::TileHeader tile_head(thisRoundNumRows, compressedOffset);
809 memcpy(dest, &tile_head, sizeof(FITS::TileHeader));
810
811 return compressedOffset;
812 }
813
814 /// Transpose a tile to a new buffer
815 /// @param src buffer hosting the regular, row-ordered data
816 /// @param dest the target buffer that will receive the transposed data
817 void copyTransposeTile(const char* src, char* dest, uint32_t num_rows)
818 {
819 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
820
821 //copy the tile and transpose it
822 for (uint32_t i=0;i<fRealColumns.size();i++)
823 {
824 switch (fRealColumns[i].block_head.getOrdering())
825 {
826 case FITS::kOrderByRow:
827 //regular, "semi-transposed" copy
828 for (uint32_t k=0;k<thisRoundNumRows;k++)
829 {
830 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
831 dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
832 }
833 break;
834
835 case FITS::kOrderByCol:
836 //transposed copy
837 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
838 for (uint32_t k=0;k<thisRoundNumRows;k++)
839 {
840 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
841 dest += fRealColumns[i].col.size;
842 }
843 break;
844 };
845 }
846 }
847
848 /// Specific compression functions
849 /// @param dest the target buffer
850 /// @param src the source buffer
851 /// @param size number of bytes to copy
852 /// @return number of bytes written
853 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
854 {
855 memcpy(dest, src, size);
856 return size;
857 }
858
859 /// Do huffman encoding
860 /// @param dest the buffer that will receive the compressed data
861 /// @param src the buffer hosting the transposed data
862 /// @param numRows number of rows of data in the transposed buffer
863 /// @param sizeOfElems size in bytes of one data elements
864 /// @param numRowElems number of elements on each row
865 /// @return number of bytes written
866 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
867 {
868 std::string huffmanOutput;
869 uint32_t previousHuffmanSize = 0;
870
871 //if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
872 if (numRows < 2)
873 return numRows*sizeOfElems*numRowElems + 1000;
874
875 if (sizeOfElems < 2 )
876 {
877#ifdef __EXCEPTIONS
878 throw std::runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
879#else
880 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << std::endl;
881 return 0;
882#endif
883 }
884
885 uint32_t huffmanOffset = 0;
886 for (uint32_t j=0;j<numRowElems;j++)
887 {
888 Huffman::Encode(huffmanOutput,
889 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
890 numRows*(sizeOfElems/2));
891 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
892 huffmanOffset += sizeof(uint32_t);
893 previousHuffmanSize = huffmanOutput.size();
894 }
895
896 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
897
898 //only copy if not larger than not-compressed size
899 if (totalSize < numRows*sizeOfElems*numRowElems)
900 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
901
902 return totalSize;
903 }
904
905 /// Applies Thomas' DRS4 smoothing
906 /// @param data where to apply it
907 /// @param numElems how many elements of type int16_t are stored in the buffer
908 /// @return number of bytes modified
909 uint32_t applySMOOTHING(char* data, uint32_t numElems)
910 {
911 int16_t* short_data = reinterpret_cast<int16_t*>(data);
912 for (int j=numElems-1;j>1;j--)
913 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
914
915 return numElems*sizeof(int16_t);
916 }
917
918 /// Apply the inverse transform of the integer smoothing
919 /// @param data where to apply it
920 /// @param numElems how many elements of type int16_t are stored in the buffer
921 /// @return number of bytes modified
922 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
923 {
924 int16_t* short_data = reinterpret_cast<int16_t*>(data);
925 for (uint32_t j=2;j<numElems;j++)
926 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
927
928 return numElems*sizeof(uint16_t);
929 }
930
931
932
933 //thread related stuff
934 MemoryManager fMemPool; ///< Actual memory manager, providing memory for the compression buffers
935 int32_t fNumQueues; ///< Current number of threads that will be used by this object
936 uint64_t fMaxUsableMem; ///< Maximum number of bytes that can be allocated by the memory manager
937 int32_t fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads)
938
939 std::vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads)
940 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///< Writing queue (=thread)
941
942 // catalog related stuff
943 CatalogType fCatalog; ///< Catalog for this file
944 uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists)
945 uint32_t fNumTiles; ///< Number of pre-reserved tiles
946 uint32_t fNumRowsPerTile; ///< Number of rows per tile
947 off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file
948
949 // checksum related stuff
950 Checksum fCatalogSum; ///< Checksum of the catalog
951 Checksum fRawSum; ///< Raw sum (specific to FACT)
952 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
953
954 // data layout related stuff
955 /// Regular columns augmented with compression informations
956 struct CompressedColumn
957 {
958 CompressedColumn(const Table::Column& c, const FITS::Compression& h) : col(c),
959 block_head(h)
960 {}
961#ifdef __MARS__ // Needed for the compilation ofthe dictionary
962 CompressedColumn() { }
963#endif
964 Table::Column col; ///< the regular column entry
965 FITS::Compression block_head; ///< the compression data associated with that column
966 };
967 std::vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file
968 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row
969 std::shared_ptr<char> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written
970 std::vector<char> fRawSumBuffer; ///< buffer used for checksuming the incoming data, before compression
971
972#ifdef __EXCEPTIONS
973 std::exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads
974#endif
975 int fErrno; ///< propagate errno to main thread
976};
977
978#endif
Note: See TracBrowser for help on using the repository browser.