source: trunk/Mars/mcore/zofits.h@ 17778

Last change on this file since 17778 was 17766, checked in by tbretz, 11 years ago
Removing the std::exception_ptr by an ifdef is dangerous, because CINT will never set that define... therefore, make sure the correct size is always there.
File size: 39.7 KB
Line 
1#ifndef FACT_zofits
2#define FACT_zofits
3
4/*
5 * zofits.h
6 *
7 * FACT native compressed FITS writer
8 * Author: lyard
9 */
10
11#include "ofits.h"
12#include "huffman.h"
13#include "Queue.h"
14#include "MemoryManager.h"
15
16#ifdef HAVE_BOOST_THREAD
17#include <boost/thread.hpp>
18#endif
19
20#if defined(__CINT__) || !defined(__EXCEPTIONS)
21namespace std
22{
23 typedef void* exception_ptr;
24};
25#endif
26
27class zofits : public ofits
28{
29#ifdef __MARS__ // Needed by CINT to access the structures
30public:
31#endif
32 /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin
33 template<class S>
34 struct QueueMin : std::list<S>
35 {
36 typename std::list<S>::iterator begin()
37 {
38 return min_element(std::list<S>::begin(), std::list<S>::end());
39 }
40 };
41
42#ifdef __CINT__
43 // CINT doesn't like the packed attribute...
44 // Therefore we give another hint of the size of the structure
45 struct CatalogEntry { char dummy[16]; };
46#else
47 //catalog types
48 struct CatalogEntry
49 {
50 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) { }
51 int64_t first; ///< Size of this column in the tile
52 int64_t second; ///< offset of this column in the tile, from the start of the heap area
53 } __attribute__((__packed__));
54#endif
55
56 typedef std::vector<CatalogEntry> CatalogRow;
57 typedef std::list<CatalogRow> CatalogType;
58
59 /// Parameters required to write a tile to disk
60 struct WriteTarget
61 {
62 bool operator < (const WriteTarget& other) const
63 {
64 return tile_num < other.tile_num;
65 }
66
67 WriteTarget() { }
68 WriteTarget(const WriteTarget &t, uint32_t sz) : tile_num(t.tile_num), size(sz), data(t.data) { }
69
70 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
71 uint32_t size; ///< Size to write
72 std::shared_ptr<char> data; ///< Memory block to write
73 };
74
75
76 /// Parameters required to compress a tile of data
77 struct CompressionTarget
78 {
79 CompressionTarget(CatalogRow& r) : catalog_entry(r)
80 {}
81
82 CatalogRow& catalog_entry; ///< Reference to the catalog entry to deal with
83 std::shared_ptr<char> src; ///< Original data
84 std::shared_ptr<char> transposed_src; ///< Transposed data
85 WriteTarget target; ///< Compressed data
86 uint32_t num_rows; ///< Number of rows to compress
87 };
88
89public:
90 /// static setter for the default number of threads to use. -1 means all available physical cores
91 static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)>=-1) n=_n; return n; }
92 static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; }
93 static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; }
94 static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; }
95
96 /// constructors
97 /// @param numTiles how many data groups should be pre-reserved ?
98 /// @param rowPerTile how many rows will be grouped together in a single tile
99 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
100 zofits(uint32_t numTiles = DefaultMaxNumTiles(),
101 uint32_t rowPerTile = DefaultNumRowsPerTile(),
102 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
103 fMemPool(0, size_t(maxUsableMem)*1000),
104 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
105 {
106 InitMemberVariables(numTiles, rowPerTile, size_t(maxUsableMem)*1000);
107 SetNumThreads(DefaultNumThreads());
108 }
109
110 /// @param fname the target filename
111 /// @param numTiles how many data groups should be pre-reserved ?
112 /// @param rowPerTile how many rows will be grouped together in a single tile
113 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
114 zofits(const char* fname,
115 uint32_t numTiles = DefaultMaxNumTiles(),
116 uint32_t rowPerTile = DefaultNumRowsPerTile(),
117 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
118 fMemPool(0, size_t(maxUsableMem)*1000),
119 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
120 {
121 open(fname);
122 InitMemberVariables(numTiles, rowPerTile, size_t(maxUsableMem)*1000);
123 SetNumThreads(DefaultNumThreads());
124 }
125
126 zofits(const std::string &fname,
127 uint32_t numTiles = DefaultMaxNumTiles(),
128 uint32_t rowPerTile = DefaultNumRowsPerTile(),
129 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
130 fMemPool(0, size_t(maxUsableMem)*1000),
131 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
132 {
133 open(fname);
134 InitMemberVariables(numTiles, rowPerTile, size_t(maxUsableMem)*1000);
135 SetNumThreads(DefaultNumThreads());
136 }
137
138 //initialization of member variables
139 /// @param nt number of tiles
140 /// @param rpt number of rows per tile
141 /// @param maxUsableMem max amount of RAM to be used by the compression buffers
142 void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
143 {
144 fCheckOffset = 0;
145 fNumQueues = 0;
146
147 fNumTiles = nt==0 ? 1 : nt;
148 fNumRowsPerTile = rpt;
149
150 fRealRowWidth = 0;
151 fCatalogOffset = 0;
152 fCatalogSize = 0;
153
154 fMaxUsableMem = maxUsableMem;
155#ifdef __EXCEPTIONS
156 fThreadsException = std::exception_ptr();
157#endif
158 fErrno = 0;
159 }
160
161 /// write the header of the binary table
162 /// @param name the name of the table to be created
163 /// @return the state of the file
164 virtual bool WriteTableHeader(const char* name="DATA")
165 {
166 reallocateBuffers();
167
168 SetInt("ZNAXIS1", fRealRowWidth);
169
170 ofits::WriteTableHeader(name);
171
172 fCompressionQueues.front().setPromptExecution(fNumQueues==0);
173 fWriteToDiskQueue.setPromptExecution(fNumQueues==0);
174
175 if (fNumQueues != 0)
176 {
177 //start the compression queues
178 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
179 it->start();
180
181 //start the disk writer
182 fWriteToDiskQueue.start();
183 }
184
185 //mark that no tile has been written so far
186 fLatestWrittenTile = -1;
187
188 //no wiring error (in the writing of the data) has occured so far
189 fErrno = 0;
190
191 return good();
192 }
193
194 /// open a new file.
195 /// @param filename the name of the file
196 /// @param Whether or not the name of the extension should be added or not
197 void open(const char* filename, bool addEXTNAMEKey=true)
198 {
199 ofits::open(filename, addEXTNAMEKey);
200
201 //add compression-related header entries
202 SetBool( "ZTABLE", true, "Table is compressed");
203 SetInt( "ZNAXIS1", 0, "Width of uncompressed rows");
204 SetInt( "ZNAXIS2", 0, "Number of uncompressed rows");
205 SetInt( "ZPCOUNT", 0, "");
206 SetInt( "ZHEAPPTR", 0, "");
207 SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
208 SetInt( "THEAP", 0, "");
209 SetStr( "RAWSUM", " 0", "Checksum of raw little endian data");
210 SetFloat("ZRATIO", 0, "Compression ratio");
211 SetInt( "ZSHRINK", 1, "Catalog shrink factor");
212
213 fCatalogSize = 0;
214 fRealRowWidth = 0;
215 fCatalogOffset = 0;
216 fCatalogSize = 0;
217 fCheckOffset = 0;
218
219 fRealColumns.clear();
220 fCatalog.clear();
221 fCatalogSum.reset();
222 fRawSum.reset();
223 }
224
225 void open(const std::string &filename, bool addEXTNAMEKey=true)
226 {
227 open(filename.c_str(), addEXTNAMEKey);
228 }
229
230 /// Super method. does nothing as zofits does not know about DrsOffsets
231 /// @return the state of the file
232 virtual bool WriteDrsOffsetsTable()
233 {
234 return good();
235 }
236
237 /// Returns the number of bytes per uncompressed row
238 /// @return number of bytes per uncompressed row
239 uint32_t GetBytesPerRow() const
240 {
241 return fRealRowWidth;
242 }
243
244 /// Write the data catalog
245 /// @return the state of the file
246 bool WriteCatalog()
247 {
248 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
249 const uint32_t total_catalog_size = fNumTiles*one_catalog_row_size;
250
251 // swap the catalog bytes before writing
252 std::vector<char> swapped_catalog(total_catalog_size);
253
254 uint32_t shift = 0;
255 for (auto it=fCatalog.cbegin(); it!=fCatalog.cend(); it++)
256 {
257 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
258 shift += one_catalog_row_size;
259 }
260
261 if (fCatalogSize < fNumTiles)
262 memset(swapped_catalog.data()+shift, 0, total_catalog_size-shift);
263
264 // first time writing ? remember where we are
265 if (fCatalogOffset == 0)
266 fCatalogOffset = tellp();
267
268 // remember where we came from
269 const off_t where_are_we = tellp();
270
271 // write to disk
272 seekp(fCatalogOffset);
273 write(swapped_catalog.data(), total_catalog_size);
274
275 if (where_are_we != fCatalogOffset)
276 seekp(where_are_we);
277
278 // udpate checksum
279 fCatalogSum.reset();
280 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
281
282 return good();
283 }
284
285 /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets.
286 virtual void DrsOffsetCalibrate(char* )
287 {
288
289 }
290
291 CatalogRow& AddOneCatalogRow()
292 {
293 // add one row to the catalog
294 fCatalog.emplace_back(CatalogRow());
295 fCatalog.back().resize(fTable.num_cols);
296 for (auto it=fCatalog.back().begin(); it != fCatalog.back().end(); it++)
297 *it = CatalogEntry(0,0);
298
299 fCatalogSize++;
300
301 return fCatalog.back();
302 }
303
304 /// write one row of data
305 /// Note, in a multi-threaded environment (NumThreads>0), the return code should be checked rather
306 /// than the badbit() of the stream (it might have been set by a thread before the errno has been set)
307 /// errno will then contain the correct error number of the last error which happened during writing.
308 /// @param ptr the source buffer
309 /// @param the number of bytes to write
310 /// @return the state of the file. WARNING: with multithreading, this will most likely be the state of the file before the data is actually written
311 bool WriteRow(const void* ptr, size_t cnt, bool = true)
312 {
313 if (cnt != fRealRowWidth)
314 {
315#ifdef __EXCEPTIONS
316 throw std::runtime_error("Wrong size of row given to WriteRow");
317#else
318 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << std::endl;
319 return false;
320#endif
321 }
322
323#ifdef __EXCEPTIONS
324 //check if something hapenned while the compression threads were working
325 //if so, re-throw the exception that was generated
326 if (fThreadsException != std::exception_ptr())
327 std::rethrow_exception(fThreadsException);
328#endif
329
330 //copy current row to pool or rows waiting for compression
331 char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
332 memcpy(target_location, ptr, fRealRowWidth);
333
334 //for now, make an extra copy of the data, for RAWSUM checksuming.
335 //Ideally this should be moved to the threads
336 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
337 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
338 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
339 char* buffer = fRawSumBuffer.data() + rawOffset;
340 auto ib = fRawSumBuffer.begin();
341 auto ie = fRawSumBuffer.rbegin();
342 *ib++ = 0;
343 *ib++ = 0;
344 *ib++ = 0;
345 *ib = 0;
346
347 *ie++ = 0;
348 *ie++ = 0;
349 *ie++ = 0;
350 *ie = 0;
351
352 memcpy(buffer, ptr, fRealRowWidth);
353
354 fRawSum.add(fRawSumBuffer, false);
355
356 fTable.num_rows++;
357
358 if (fTable.num_rows % fNumRowsPerTile != 0)
359 {
360 errno = fErrno;
361 return errno==0;
362 }
363
364 // use the least occupied queue
365 const auto imin = std::min_element(fCompressionQueues.begin(), fCompressionQueues.end());
366
367 if (!imin->emplace(InitNextCompression()))
368 {
369#ifdef __EXCEPTIONS
370 throw std::runtime_error("The compression queues are not started. Did you close the file before writing this row?");
371#else
372 gLog << ___err___ << "The compression queues are not started. Did you close the file before writing this row?" << std::endl;
373 errno = 0;
374 return false;
375#endif
376 }
377
378 errno = fErrno;
379 return errno==0;
380 }
381
382 /// update the real number of rows
383 void FlushNumRows()
384 {
385 SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile);
386 SetInt("ZNAXIS2", fTable.num_rows);
387 FlushHeader();
388 }
389
390 /// Setup the environment to compress yet another tile of data
391 /// @param target the struct where to host the produced parameters
392 CompressionTarget InitNextCompression()
393 {
394 CompressionTarget target(AddOneCatalogRow());
395
396 //fill up compression target
397 target.src = fSmartBuffer;
398 target.transposed_src = fMemPool.malloc();
399 target.num_rows = fTable.num_rows;
400
401 //fill up write to disk target
402 WriteTarget &write_target = target.target;
403 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
404 write_target.size = 0;
405 write_target.data = fMemPool.malloc();
406
407 //get a new buffer to host the incoming data
408 fSmartBuffer = fMemPool.malloc();
409
410 return target;
411 }
412
413 /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file.
414 uint32_t ShrinkCatalog()
415 {
416 //add empty row to get either the target number of rows, or a multiple of the allowed size
417 for (uint32_t i=0;i<fCatalogSize%fNumTiles;i++)
418 AddOneCatalogRow();
419
420 //did we write more rows than what the catalog could host ?
421 if (fCatalogSize <= fNumTiles) // nothing to do
422 return 1;
423
424 //always exact as extra rows were added just above
425 const uint32_t shrink_factor = fCatalogSize / fNumTiles;
426
427 //shrink the catalog !
428 uint32_t entry_id = 1;
429 auto it = fCatalog.begin();
430 it++;
431 for (; it != fCatalog.end(); it++)
432 {
433 if (entry_id >= fNumTiles)
434 break;
435
436 const uint32_t target_id = entry_id*shrink_factor;
437
438 auto jt = it;
439 for (uint32_t i=0; i<target_id-entry_id; i++)
440 jt++;
441
442 *it = *jt;
443
444 entry_id++;
445 }
446
447 const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
448
449 //remove the too many entries
450 for (uint32_t i=0;i<num_tiles_to_remove;i++)
451 {
452 fCatalog.pop_back();
453 fCatalogSize--;
454 }
455
456 //update header keywords
457 fNumRowsPerTile *= shrink_factor;
458
459 SetInt("ZTILELEN", fNumRowsPerTile);
460 SetInt("ZSHRINK", shrink_factor);
461
462 return shrink_factor;
463 }
464
465 /// close an open file.
466 /// @return the state of the file
467 bool close()
468 {
469 // stop compression and write threads
470 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
471 it->wait();
472
473 fWriteToDiskQueue.wait();
474
475 if (tellp() < 0)
476 return false;
477
478#ifdef __EXCEPTIONS
479 //check if something hapenned while the compression threads were working
480 //if so, re-throw the exception that was generated
481 if (fThreadsException != std::exception_ptr())
482 std::rethrow_exception(fThreadsException);
483#endif
484
485 //write the last tile of data (if any)
486 if (fErrno==0 && fTable.num_rows%fNumRowsPerTile!=0)
487 {
488 fWriteToDiskQueue.enablePromptExecution();
489 fCompressionQueues.front().enablePromptExecution();
490 fCompressionQueues.front().emplace(InitNextCompression());
491 }
492
493 AlignTo2880Bytes();
494
495 int64_t heap_size = 0;
496 int64_t compressed_offset = 0;
497 for (auto it=fCatalog.begin(); it!= fCatalog.end(); it++)
498 {
499 compressed_offset += sizeof(FITS::TileHeader);
500 heap_size += sizeof(FITS::TileHeader);
501 for (uint32_t j=0; j<it->size(); j++)
502 {
503 heap_size += (*it)[j].first;
504 (*it)[j].second = compressed_offset;
505 compressed_offset += (*it)[j].first;
506 if ((*it)[j].first == 0)
507 (*it)[j].second = 0;
508 }
509 }
510
511 const uint32_t shrink_factor = ShrinkCatalog();
512
513 //update header keywords
514 SetInt("ZNAXIS1", fRealRowWidth);
515 SetInt("ZNAXIS2", fTable.num_rows);
516
517 SetInt("ZHEAPPTR", fCatalogSize*fTable.num_cols*sizeof(uint64_t)*2);
518
519 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
520 const uint32_t total_catalog_width = 2*sizeof(int64_t)*fTable.num_cols;
521
522 SetInt("THEAP", total_num_tiles_written*total_catalog_width);
523 SetInt("NAXIS1", total_catalog_width);
524 SetInt("NAXIS2", total_num_tiles_written);
525 SetStr("RAWSUM", std::to_string((long long int)(fRawSum.val())));
526
527 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
528 SetFloat("ZRATIO", compression_ratio);
529
530 //add to the heap size the size of the gap between the catalog and the actual heap
531 heap_size += (fCatalogSize - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
532
533 SetInt("PCOUNT", heap_size, "size of special data area");
534
535 //Just for updating the fCatalogSum value
536 WriteCatalog();
537
538 fDataSum += fCatalogSum;
539
540 const Checksum checksm = UpdateHeaderChecksum();
541
542 if (!fFilebuf.close())
543 setstate(ios_base::failbit);
544
545 fSmartBuffer = std::shared_ptr<char>();
546
547 //restore the number of rows per tile in case the catalog has been shrinked
548 if (shrink_factor != 1)
549 fNumRowsPerTile /= shrink_factor;
550
551 if ((checksm+fDataSum).valid())
552 return true;
553
554 std::ostringstream sout;
555 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
556#ifdef __EXCEPTIONS
557 throw std::runtime_error(sout.str());
558#else
559 gLog << ___err___ << "ERROR - " << sout.str() << std::endl;
560 return false;
561#endif
562 }
563
564 /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
565 bool AddColumn(uint32_t cnt, char typechar, const std::string& name, const std::string& unit,
566 const std::string& comment="", bool addHeaderKeys=true)
567 {
568 return AddColumn(FITS::kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
569 }
570
571 /// Overload of the simplified compressed version
572 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const std::string& name,
573 const std::string& unit, const std::string& comment="", bool addHeaderKeys=true)
574 {
575 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
576 return false;
577
578 const size_t size = SizeFromType(typechar);
579
580 Table::Column col;
581 col.name = name;
582 col.type = typechar;
583 col.num = cnt;
584 col.size = size;
585 col.offset = fRealRowWidth;
586
587 fRealRowWidth += size*cnt;
588
589 fRealColumns.emplace_back(col, comp);
590
591 SetStr("ZFORM"+std::to_string((long long int)(fRealColumns.size())), std::to_string((long long int)(cnt))+typechar, "format of "+name+" "+CommentFromType(typechar));
592 SetStr("ZCTYP"+std::to_string((long long int)(fRealColumns.size())), "FACT", "Compression type: FACT");
593
594 return true;
595 }
596
597 /// Get and set the actual number of threads for this object
598 int32_t GetNumThreads() const { return fNumQueues; }
599 bool SetNumThreads(uint32_t num)
600 {
601 if (tellp()>0)
602 {
603#ifdef __EXCEPTIONS
604 throw std::runtime_error("Number of threads cannot be changed in the middle of writing a file");
605#else
606 gLog << ___err___ << "ERROR - Number of threads cannot be changed in the middle of writing a file" << std::endl;
607#endif
608 return false;
609 }
610
611 //get number of physically available threads
612#ifdef HAVE_BOOST_THREAD
613 unsigned int num_available_cores = boost::thread::hardware_concurrency();
614#else
615 unsigned int num_available_cores = std::thread::hardware_concurrency();
616#endif
617 // could not detect number of available cores from system properties...
618 if (num_available_cores == 0)
619 num_available_cores = 1;
620
621 // leave one core for the main thread and one for the writing
622 if (num > num_available_cores)
623 num = num_available_cores>2 ? num_available_cores-2 : 1;
624
625 fCompressionQueues.resize(num<1?1:num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false));
626 fNumQueues = num;
627
628 return true;
629 }
630
631 uint32_t GetNumTiles() const { return fNumTiles; }
632 void SetNumTiles(uint32_t num) { fNumTiles=num; }
633
634protected:
635
636 /// Allocates the required objects.
637 void reallocateBuffers()
638 {
639 const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(FITS::BlockHeader) + sizeof(FITS::TileHeader) + 8; //+8 for checksuming;
640 fMemPool.setChunkSize(chunk_size);
641
642 fSmartBuffer = fMemPool.malloc();
643 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
644 }
645
646 /// Actually does the writing to disk (and checksuming)
647 /// @param src the buffer to write
648 /// @param sizeToWrite how many bytes should be written
649 /// @return the state of the file
650 bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite)
651 {
652 char* checkSumPointer = src+4;
653 int32_t extraBytes = 0;
654 uint32_t sizeToChecksum = sizeToWrite;
655
656 //should we extend the array to the left ?
657 if (fCheckOffset != 0)
658 {
659 sizeToChecksum += fCheckOffset;
660 checkSumPointer -= fCheckOffset;
661 memset(checkSumPointer, 0, fCheckOffset);
662 }
663
664 //should we extend the array to the right ?
665 if (sizeToChecksum%4 != 0)
666 {
667 extraBytes = 4 - (sizeToChecksum%4);
668 memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
669 sizeToChecksum += extraBytes;
670 }
671
672 //do the checksum
673 fDataSum.add(checkSumPointer, sizeToChecksum);
674
675 fCheckOffset = (4 - extraBytes)%4;
676
677 //write data to disk
678 write(src+4, sizeToWrite);
679
680 return good();
681 }
682
683 /// Compress a given buffer based on the target. This is the method executed by the threads
684 /// @param target the struct hosting the parameters of the compression
685 /// @return number of bytes of the compressed data, or always 1 when used by the Queues
686 bool CompressBuffer(const CompressionTarget& target)
687 {
688 //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
689 //Really do not understand what's wrong...
690 //calibrate data if required
691 const uint32_t thisRoundNumRows = (target.num_rows%fNumRowsPerTile) ? target.num_rows%fNumRowsPerTile : fNumRowsPerTile;
692 for (uint32_t i=0;i<thisRoundNumRows;i++)
693 {
694 char* target_location = target.src.get() + fRealRowWidth*i;
695 DrsOffsetCalibrate(target_location);
696 }
697#ifdef __EXCEPTIONS
698 try
699 {
700#endif
701 //transpose the original data
702 copyTransposeTile(target.src.get(), target.transposed_src.get(), target.num_rows);
703
704 //compress the buffer
705 const uint64_t compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
706
707 //post the result to the writing queue
708 //get a copy so that it becomes non-const
709 fWriteToDiskQueue.emplace(target.target, compressed_size);
710
711#ifdef __EXCEPTIONS
712 }
713 catch (...)
714 {
715 fThreadsException = std::current_exception();
716 if (fNumQueues == 0)
717 std::rethrow_exception(fThreadsException);
718 }
719#endif
720
721 return true;
722 }
723
724 /// Write one compressed tile to disk. This is the method executed by the writing thread
725 /// @param target the struct hosting the write parameters
726 bool WriteBufferToDisk(const WriteTarget& target)
727 {
728 //is this the tile we're supposed to write ?
729 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
730 return false;
731
732 fLatestWrittenTile++;
733
734#ifdef __EXCEPTIONS
735 try
736 {
737#endif
738 //could not write the data to disk
739 if (!writeCompressedDataToDisk(target.data.get(), target.size))
740 fErrno = errno;
741#ifdef __EXCEPTIONS
742 }
743 catch (...)
744 {
745 fThreadsException = std::current_exception();
746 if (fNumQueues == 0)
747 std::rethrow_exception(fThreadsException);
748 }
749#endif
750 return true;
751 }
752
753 /// Compress a given buffer based on its source and destination
754 //src cannot be const, as applySMOOTHING is done in place
755 /// @param dest the buffer hosting the compressed data
756 /// @param src the buffer hosting the transposed data
757 /// @param num_rows the number of uncompressed rows in the transposed buffer
758 /// @param the number of bytes of the compressed data
759 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows, CatalogRow& catalog_row)
760 {
761 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
762 uint32_t offset = 0;
763
764 //skip the checksum reserved area
765 dest += 4;
766
767 //skip the 'TILE' marker and tile size entry
768 uint64_t compressedOffset = sizeof(FITS::TileHeader);
769
770 //now compress each column one by one by calling compression on arrays
771 for (uint32_t i=0;i<fRealColumns.size();i++)
772 {
773 catalog_row[i].second = compressedOffset;
774
775 if (fRealColumns[i].col.num == 0)
776 continue;
777
778 FITS::Compression& head = fRealColumns[i].block_head;
779
780 //set the default byte telling if uncompressed the compressed Flag
781 const uint64_t previousOffset = compressedOffset;
782
783 //skip header data
784 compressedOffset += head.getSizeOnDisk();
785
786 for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
787 {
788 switch (head.getProc(j))
789 {
790 case FITS::kFactRaw:
791 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
792 break;
793
794 case FITS::kFactSmoothing:
795 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
796 break;
797
798 case FITS::kFactHuffman16:
799 if (head.getOrdering() == FITS::kOrderByCol)
800 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
801 else
802 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
803 break;
804 }
805 }
806
807 //check if compressed size is larger than uncompressed
808 //if so set flag and redo it uncompressed
809 if ((head.getProc(0) != FITS::kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
810 {
811 //de-smooth !
812 if (head.getProc(0) == FITS::kFactSmoothing)
813 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
814
815 FITS::Compression he;
816
817 compressedOffset = previousOffset + he.getSizeOnDisk();
818 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
819
820 he.SetBlockSize(compressedOffset - previousOffset);
821 he.Memcpy(dest+previousOffset);
822
823 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
824
825 catalog_row[i].first = compressedOffset - catalog_row[i].second;
826 continue;
827 }
828
829 head.SetBlockSize(compressedOffset - previousOffset);
830 head.Memcpy(dest + previousOffset);
831
832 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
833 catalog_row[i].first = compressedOffset - catalog_row[i].second;
834 }
835
836 const FITS::TileHeader tile_head(thisRoundNumRows, compressedOffset);
837 memcpy(dest, &tile_head, sizeof(FITS::TileHeader));
838
839 return compressedOffset;
840 }
841
842 /// Transpose a tile to a new buffer
843 /// @param src buffer hosting the regular, row-ordered data
844 /// @param dest the target buffer that will receive the transposed data
845 void copyTransposeTile(const char* src, char* dest, uint32_t num_rows)
846 {
847 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
848
849 //copy the tile and transpose it
850 for (uint32_t i=0;i<fRealColumns.size();i++)
851 {
852 switch (fRealColumns[i].block_head.getOrdering())
853 {
854 case FITS::kOrderByRow:
855 //regular, "semi-transposed" copy
856 for (uint32_t k=0;k<thisRoundNumRows;k++)
857 {
858 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
859 dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
860 }
861 break;
862
863 case FITS::kOrderByCol:
864 //transposed copy
865 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
866 for (uint32_t k=0;k<thisRoundNumRows;k++)
867 {
868 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
869 dest += fRealColumns[i].col.size;
870 }
871 break;
872 };
873 }
874 }
875
876 /// Specific compression functions
877 /// @param dest the target buffer
878 /// @param src the source buffer
879 /// @param size number of bytes to copy
880 /// @return number of bytes written
881 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
882 {
883 memcpy(dest, src, size);
884 return size;
885 }
886
887 /// Do huffman encoding
888 /// @param dest the buffer that will receive the compressed data
889 /// @param src the buffer hosting the transposed data
890 /// @param numRows number of rows of data in the transposed buffer
891 /// @param sizeOfElems size in bytes of one data elements
892 /// @param numRowElems number of elements on each row
893 /// @return number of bytes written
894 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
895 {
896 std::string huffmanOutput;
897 uint32_t previousHuffmanSize = 0;
898
899 //if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
900 if (numRows < 2)
901 return numRows*sizeOfElems*numRowElems + 1000;
902
903 if (sizeOfElems < 2 )
904 {
905#ifdef __EXCEPTIONS
906 throw std::runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
907#else
908 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << std::endl;
909 return 0;
910#endif
911 }
912
913 uint32_t huffmanOffset = 0;
914 for (uint32_t j=0;j<numRowElems;j++)
915 {
916 Huffman::Encode(huffmanOutput,
917 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
918 numRows*(sizeOfElems/2));
919 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
920 huffmanOffset += sizeof(uint32_t);
921 previousHuffmanSize = huffmanOutput.size();
922 }
923
924 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
925
926 //only copy if not larger than not-compressed size
927 if (totalSize < numRows*sizeOfElems*numRowElems)
928 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
929
930 return totalSize;
931 }
932
933 /// Applies Thomas' DRS4 smoothing
934 /// @param data where to apply it
935 /// @param numElems how many elements of type int16_t are stored in the buffer
936 /// @return number of bytes modified
937 uint32_t applySMOOTHING(char* data, uint32_t numElems)
938 {
939 int16_t* short_data = reinterpret_cast<int16_t*>(data);
940 for (int j=numElems-1;j>1;j--)
941 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
942
943 return numElems*sizeof(int16_t);
944 }
945
946 /// Apply the inverse transform of the integer smoothing
947 /// @param data where to apply it
948 /// @param numElems how many elements of type int16_t are stored in the buffer
949 /// @return number of bytes modified
950 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
951 {
952 int16_t* short_data = reinterpret_cast<int16_t*>(data);
953 for (uint32_t j=2;j<numElems;j++)
954 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
955
956 return numElems*sizeof(uint16_t);
957 }
958
959
960
961 //thread related stuff
962 MemoryManager fMemPool; ///< Actual memory manager, providing memory for the compression buffers
963 int32_t fNumQueues; ///< Current number of threads that will be used by this object
964 uint64_t fMaxUsableMem; ///< Maximum number of bytes that can be allocated by the memory manager
965 int32_t fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads)
966
967 std::vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads)
968 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///< Writing queue (=thread)
969
970 // catalog related stuff
971 CatalogType fCatalog; ///< Catalog for this file
972 uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists)
973 uint32_t fNumTiles; ///< Number of pre-reserved tiles
974 uint32_t fNumRowsPerTile; ///< Number of rows per tile
975 off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file
976
977 // checksum related stuff
978 Checksum fCatalogSum; ///< Checksum of the catalog
979 Checksum fRawSum; ///< Raw sum (specific to FACT)
980 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
981
982 // data layout related stuff
983 /// Regular columns augmented with compression informations
984 struct CompressedColumn
985 {
986 CompressedColumn(const Table::Column& c, const FITS::Compression& h) : col(c),
987 block_head(h)
988 {}
989#ifdef __MARS__ // Needed for the compilation ofthe dictionary
990 CompressedColumn() { }
991#endif
992 Table::Column col; ///< the regular column entry
993 FITS::Compression block_head; ///< the compression data associated with that column
994 };
995 std::vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file
996 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row
997 std::shared_ptr<char> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written
998 std::vector<char> fRawSumBuffer; ///< buffer used for checksuming the incoming data, before compression
999
1000 std::exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads
1001 int fErrno; ///< propagate errno to main thread
1002};
1003
1004#endif
Note: See TracBrowser for help on using the repository browser.