source: trunk/Mars/mcore/zofits.h@ 17751

Last change on this file since 17751 was 17751, checked in by tbretz, 10 years ago
I realized that we were one level too high with deriving the class. Not only there was no possibility to use open/close, furthermore the close funtion was not virtual and therefore not necessarily correctly called. By writing a ofstream type class, both is now solved.
File size: 39.4 KB
Line 
1#ifndef FACT_zofits
2#define FACT_zofits
3
4/*
5 * zofits.h
6 *
7 * FACT native compressed FITS writer
8 * Author: lyard
9 */
10
11#include "ofits.h"
12#include "huffman.h"
13#include "Queue.h"
14#include "MemoryManager.h"
15
16#ifdef HAVE_BOOST_THREAD
17#include <boost/thread.hpp>
18#endif
19
20class zofits : public ofits
21{
22#ifdef __MARS__ // Needed by CINT to access the structures
23public:
24#endif
25 /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin
26 template<class S>
27 struct QueueMin : std::list<S>
28 {
29 typename std::list<S>::iterator begin()
30 {
31 return min_element(std::list<S>::begin(), std::list<S>::end());
32 }
33 };
34
35#ifdef __CINT__
36 struct CatalogEntry;
37#else
38 //catalog types
39 struct CatalogEntry
40 {
41 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) { }
42 int64_t first; ///< Size of this column in the tile
43 int64_t second; ///< offset of this column in the tile, from the start of the heap area
44 } __attribute__((__packed__));
45#endif
46
47 typedef std::vector<CatalogEntry> CatalogRow;
48 typedef std::list<CatalogRow> CatalogType;
49
50 /// Parameters required to write a tile to disk
51 struct WriteTarget
52 {
53 bool operator < (const WriteTarget& other) const
54 {
55 return tile_num < other.tile_num;
56 }
57
58 WriteTarget() { }
59 WriteTarget(const WriteTarget &t, uint32_t sz) : tile_num(t.tile_num), size(sz), data(t.data) { }
60
61 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
62 uint32_t size; ///< Size to write
63 std::shared_ptr<char> data; ///< Memory block to write
64 };
65
66
67 /// Parameters required to compress a tile of data
68 struct CompressionTarget
69 {
70 CompressionTarget(CatalogRow& r) : catalog_entry(r)
71 {}
72
73 CatalogRow& catalog_entry; ///< Reference to the catalog entry to deal with
74 std::shared_ptr<char> src; ///< Original data
75 std::shared_ptr<char> transposed_src; ///< Transposed data
76 WriteTarget target; ///< Compressed data
77 uint32_t num_rows; ///< Number of rows to compress
78 };
79
80public:
81 /// static setter for the default number of threads to use. -1 means all available physical cores
82 static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)<-1) n=_n; return n; }
83 static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; }
84 static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; }
85 static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; }
86
87 /// constructors
88 /// @param numTiles how many data groups should be pre-reserved ?
89 /// @param rowPerTile how many rows will be grouped together in a single tile
90 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
91 zofits(uint32_t numTiles = DefaultMaxNumTiles(),
92 uint32_t rowPerTile = DefaultNumRowsPerTile(),
93 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
94 fMemPool(0, size_t(maxUsableMem)*1000),
95 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
96 {
97 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
98 SetNumThreads(DefaultNumThreads());
99 }
100
101 /// @param fname the target filename
102 /// @param numTiles how many data groups should be pre-reserved ?
103 /// @param rowPerTile how many rows will be grouped together in a single tile
104 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
105 zofits(const char* fname,
106 uint32_t numTiles = DefaultMaxNumTiles(),
107 uint32_t rowPerTile = DefaultNumRowsPerTile(),
108 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
109 fMemPool(0, maxUsableMem*1000),
110 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
111 {
112 open(fname);
113 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
114 SetNumThreads(DefaultNumThreads());
115 }
116
117 zofits(const std::string &fname,
118 uint32_t numTiles = DefaultMaxNumTiles(),
119 uint32_t rowPerTile = DefaultNumRowsPerTile(),
120 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
121 fMemPool(0, maxUsableMem*1000),
122 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
123 {
124 open(fname);
125 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
126 SetNumThreads(DefaultNumThreads());
127 }
128
129 //initialization of member variables
130 /// @param nt number of tiles
131 /// @param rpt number of rows per tile
132 /// @param maxUsableMem max amount of RAM to be used by the compression buffers
133 void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
134 {
135 fCheckOffset = 0;
136 fNumQueues = 0;
137
138 fNumTiles = nt==0 ? 1 : nt;
139 fNumRowsPerTile = rpt;
140
141 fRealRowWidth = 0;
142 fCatalogOffset = 0;
143 fCatalogSize = 0;
144
145 fMaxUsableMem = maxUsableMem;
146#ifdef __EXCEPTIONS
147 fThreadsException = std::exception_ptr();
148#endif
149 fErrno = 0;
150 }
151
152 /// write the header of the binary table
153 /// @param name the name of the table to be created
154 /// @return the state of the file
155 virtual bool WriteTableHeader(const char* name="DATA")
156 {
157 reallocateBuffers();
158
159 SetInt("ZNAXIS1", fRealRowWidth);
160
161 ofits::WriteTableHeader(name);
162
163 fCompressionQueues.front().setPromptExecution(fNumQueues==0);
164 fWriteToDiskQueue.setPromptExecution(fNumQueues==0);
165
166 if (fNumQueues != 0)
167 {
168 //start the compression queues
169 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
170 it->start();
171
172 //start the disk writer
173 fWriteToDiskQueue.start();
174 }
175
176 //mark that no tile has been written so far
177 fLatestWrittenTile = -1;
178
179 //no wiring error (in the writing of the data) has occured so far
180 fErrno = 0;
181
182 return good();
183 }
184
185 /// open a new file.
186 /// @param filename the name of the file
187 /// @param Whether or not the name of the extension should be added or not
188 void open(const char* filename, bool addEXTNAMEKey=true)
189 {
190 ofits::open(filename, addEXTNAMEKey);
191
192 //add compression-related header entries
193 SetBool( "ZTABLE", true, "Table is compressed");
194 SetInt( "ZNAXIS1", 0, "Width of uncompressed rows");
195 SetInt( "ZNAXIS2", 0, "Number of uncompressed rows");
196 SetInt( "ZPCOUNT", 0, "");
197 SetInt( "ZHEAPPTR", 0, "");
198 SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
199 SetInt( "THEAP", 0, "");
200 SetStr( "RAWSUM", " 0", "Checksum of raw little endian data");
201 SetFloat("ZRATIO", 0, "Compression ratio");
202 SetInt( "ZSHRINK", 1, "Catalog shrink factor");
203
204 fCatalogSize = 0;
205 fRealRowWidth = 0;
206 fCatalogOffset = 0;
207 fCatalogSize = 0;
208 fCheckOffset = 0;
209
210 fRealColumns.clear();
211 fCatalog.clear();
212 fCatalogSum.reset();
213 fRawSum.reset();
214 }
215
216 void open(const std::string &filename, bool addEXTNAMEKey=true)
217 {
218 open(filename.c_str(), addEXTNAMEKey);
219 }
220
221 /// Super method. does nothing as zofits does not know about DrsOffsets
222 /// @return the state of the file
223 virtual bool WriteDrsOffsetsTable()
224 {
225 return good();
226 }
227
228 /// Returns the number of bytes per uncompressed row
229 /// @return number of bytes per uncompressed row
230 uint32_t GetBytesPerRow() const
231 {
232 return fRealRowWidth;
233 }
234
235 /// Write the data catalog
236 /// @return the state of the file
237 bool WriteCatalog()
238 {
239 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
240 const uint32_t total_catalog_size = fNumTiles*one_catalog_row_size;
241
242 // swap the catalog bytes before writing
243 std::vector<char> swapped_catalog(total_catalog_size);
244
245 uint32_t shift = 0;
246 for (auto it=fCatalog.cbegin(); it!=fCatalog.cend(); it++)
247 {
248 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
249 shift += one_catalog_row_size;
250 }
251
252 if (fCatalogSize < fNumTiles)
253 memset(swapped_catalog.data()+shift, 0, total_catalog_size-shift);
254
255 // first time writing ? remember where we are
256 if (fCatalogOffset == 0)
257 fCatalogOffset = tellp();
258
259 // remember where we came from
260 const off_t where_are_we = tellp();
261
262 // write to disk
263 seekp(fCatalogOffset);
264 write(swapped_catalog.data(), total_catalog_size);
265
266 if (where_are_we != fCatalogOffset)
267 seekp(where_are_we);
268
269 // udpate checksum
270 fCatalogSum.reset();
271 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
272
273 return good();
274 }
275
276 /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets.
277 virtual void DrsOffsetCalibrate(char* )
278 {
279
280 }
281
282 CatalogRow& AddOneCatalogRow()
283 {
284 // add one row to the catalog
285 fCatalog.emplace_back(CatalogRow());
286 fCatalog.back().resize(fTable.num_cols);
287 for (auto it=fCatalog.back().begin(); it != fCatalog.back().end(); it++)
288 *it = CatalogEntry(0,0);
289
290 fCatalogSize++;
291
292 return fCatalog.back();
293 }
294
295 /// write one row of data
296 /// Note, in a multi-threaded environment (NumThreads>0), the return code should be checked rather
297 /// than the badbit() of the stream (it might have been set by a thread before the errno has been set)
298 /// errno will then contain the correct error number of the last error which happened during writing.
299 /// @param ptr the source buffer
300 /// @param the number of bytes to write
301 /// @return the state of the file. WARNING: with multithreading, this will most likely be the state of the file before the data is actually written
302 bool WriteRow(const void* ptr, size_t cnt, bool = true)
303 {
304 if (cnt != fRealRowWidth)
305 {
306#ifdef __EXCEPTIONS
307 throw std::runtime_error("Wrong size of row given to WriteRow");
308#else
309 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << std::endl;
310 return false;
311#endif
312 }
313
314#ifdef __EXCEPTIONS
315 //check if something hapenned while the compression threads were working
316 //if so, re-throw the exception that was generated
317 if (fThreadsException != std::exception_ptr())
318 std::rethrow_exception(fThreadsException);
319#endif
320
321 //copy current row to pool or rows waiting for compression
322 char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
323 memcpy(target_location, ptr, fRealRowWidth);
324
325 //for now, make an extra copy of the data, for RAWSUM checksuming.
326 //Ideally this should be moved to the threads
327 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
328 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
329 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
330 char* buffer = fRawSumBuffer.data() + rawOffset;
331 auto ib = fRawSumBuffer.begin();
332 auto ie = fRawSumBuffer.rbegin();
333 *ib++ = 0;
334 *ib++ = 0;
335 *ib++ = 0;
336 *ib = 0;
337
338 *ie++ = 0;
339 *ie++ = 0;
340 *ie++ = 0;
341 *ie = 0;
342
343 memcpy(buffer, ptr, fRealRowWidth);
344
345 fRawSum.add(fRawSumBuffer, false);
346
347 fTable.num_rows++;
348
349 if (fTable.num_rows % fNumRowsPerTile != 0)
350 {
351 errno = fErrno;
352 return errno==0;
353 }
354
355 // use the least occupied queue
356 const auto imin = std::min_element(fCompressionQueues.begin(), fCompressionQueues.end());
357
358 if (!imin->emplace(InitNextCompression()))
359 {
360#ifdef __EXCEPTIONS
361 throw std::runtime_error("The compression queues are not started. Did you close the file before writing this row?");
362#else
363 gLog << ___err___ << "The compression queues are not started. Did you close the file before writing this row?" << std::endl;
364 errno = 0;
365 return false;
366#endif
367 }
368
369 errno = fErrno;
370 return errno==0;
371 }
372
373 /// update the real number of rows
374 void FlushNumRows()
375 {
376 SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile);
377 SetInt("ZNAXIS2", fTable.num_rows);
378 FlushHeader();
379 }
380
381 /// Setup the environment to compress yet another tile of data
382 /// @param target the struct where to host the produced parameters
383 CompressionTarget InitNextCompression()
384 {
385 CompressionTarget target(AddOneCatalogRow());
386
387 //fill up compression target
388 target.src = fSmartBuffer;
389 target.transposed_src = fMemPool.malloc();
390 target.num_rows = fTable.num_rows;
391
392 //fill up write to disk target
393 WriteTarget &write_target = target.target;
394 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
395 write_target.size = 0;
396 write_target.data = fMemPool.malloc();
397
398 //get a new buffer to host the incoming data
399 fSmartBuffer = fMemPool.malloc();
400
401 return target;
402 }
403
404 /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file.
405 uint32_t ShrinkCatalog()
406 {
407 //add empty row to get either the target number of rows, or a multiple of the allowed size
408 for (uint32_t i=0;i<fCatalogSize%fNumTiles;i++)
409 AddOneCatalogRow();
410
411 //did we write more rows than what the catalog could host ?
412 if (fCatalogSize <= fNumTiles) // nothing to do
413 return 1;
414
415 //always exact as extra rows were added just above
416 const uint32_t shrink_factor = fCatalogSize / fNumTiles;
417
418 //shrink the catalog !
419 uint32_t entry_id = 1;
420 auto it = fCatalog.begin();
421 it++;
422 for (; it != fCatalog.end(); it++)
423 {
424 if (entry_id >= fNumTiles)
425 break;
426
427 const uint32_t target_id = entry_id*shrink_factor;
428
429 auto jt = it;
430 for (uint32_t i=0; i<target_id-entry_id; i++)
431 jt++;
432
433 *it = *jt;
434
435 entry_id++;
436 }
437
438 const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
439
440 //remove the too many entries
441 for (uint32_t i=0;i<num_tiles_to_remove;i++)
442 {
443 fCatalog.pop_back();
444 fCatalogSize--;
445 }
446
447 //update header keywords
448 fNumRowsPerTile *= shrink_factor;
449
450 SetInt("ZTILELEN", fNumRowsPerTile);
451 SetInt("ZSHRINK", shrink_factor);
452
453 return shrink_factor;
454 }
455
456 /// close an open file.
457 /// @return the state of the file
458 bool close()
459 {
460 // stop compression and write threads
461 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
462 it->wait();
463
464 fWriteToDiskQueue.wait();
465
466 if (tellp() < 0)
467 return false;
468
469#ifdef __EXCEPTIONS
470 //check if something hapenned while the compression threads were working
471 //if so, re-throw the exception that was generated
472 if (fThreadsException != std::exception_ptr())
473 std::rethrow_exception(fThreadsException);
474#endif
475
476 //write the last tile of data (if any)
477 if (fErrno==0 && fTable.num_rows%fNumRowsPerTile!=0)
478 {
479 fWriteToDiskQueue.enablePromptExecution();
480 fCompressionQueues.front().enablePromptExecution();
481 fCompressionQueues.front().emplace(InitNextCompression());
482 }
483
484 AlignTo2880Bytes();
485
486 int64_t heap_size = 0;
487 int64_t compressed_offset = 0;
488 for (auto it=fCatalog.begin(); it!= fCatalog.end(); it++)
489 {
490 compressed_offset += sizeof(FITS::TileHeader);
491 heap_size += sizeof(FITS::TileHeader);
492 for (uint32_t j=0; j<it->size(); j++)
493 {
494 heap_size += (*it)[j].first;
495 (*it)[j].second = compressed_offset;
496 compressed_offset += (*it)[j].first;
497 if ((*it)[j].first == 0)
498 (*it)[j].second = 0;
499 }
500 }
501
502 const uint32_t shrink_factor = ShrinkCatalog();
503
504 //update header keywords
505 SetInt("ZNAXIS1", fRealRowWidth);
506 SetInt("ZNAXIS2", fTable.num_rows);
507
508 SetInt("ZHEAPPTR", fCatalogSize*fTable.num_cols*sizeof(uint64_t)*2);
509
510 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
511 const uint32_t total_catalog_width = 2*sizeof(int64_t)*fTable.num_cols;
512
513 SetInt("THEAP", total_num_tiles_written*total_catalog_width);
514 SetInt("NAXIS1", total_catalog_width);
515 SetInt("NAXIS2", total_num_tiles_written);
516 SetStr("RAWSUM", std::to_string((long long int)(fRawSum.val())));
517
518 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
519 SetFloat("ZRATIO", compression_ratio);
520
521 //add to the heap size the size of the gap between the catalog and the actual heap
522 heap_size += (fCatalogSize - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
523
524 SetInt("PCOUNT", heap_size, "size of special data area");
525
526 //Just for updating the fCatalogSum value
527 WriteCatalog();
528
529 fDataSum += fCatalogSum;
530
531 const Checksum checksm = UpdateHeaderChecksum();
532
533 if (!fFilebuf.close())
534 setstate(ios_base::failbit);
535
536 fSmartBuffer = std::shared_ptr<char>();
537
538 //restore the number of rows per tile in case the catalog has been shrinked
539 if (shrink_factor != 1)
540 fNumRowsPerTile /= shrink_factor;
541
542 if ((checksm+fDataSum).valid())
543 return true;
544
545 std::ostringstream sout;
546 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
547#ifdef __EXCEPTIONS
548 throw std::runtime_error(sout.str());
549#else
550 gLog << ___err___ << "ERROR - " << sout.str() << std::endl;
551 return false;
552#endif
553 }
554
555 /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
556 bool AddColumn(uint32_t cnt, char typechar, const std::string& name, const std::string& unit,
557 const std::string& comment="", bool addHeaderKeys=true)
558 {
559 return AddColumn(FITS::kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
560 }
561
562 /// Overload of the simplified compressed version
563 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const std::string& name,
564 const std::string& unit, const std::string& comment="", bool addHeaderKeys=true)
565 {
566 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
567 return false;
568
569 const size_t size = SizeFromType(typechar);
570
571 Table::Column col;
572 col.name = name;
573 col.type = typechar;
574 col.num = cnt;
575 col.size = size;
576 col.offset = fRealRowWidth;
577
578 fRealRowWidth += size*cnt;
579
580 fRealColumns.emplace_back(col, comp);
581
582 SetStr("ZFORM"+std::to_string((long long int)(fRealColumns.size())), std::to_string((long long int)(cnt))+typechar, "format of "+name+" "+CommentFromType(typechar));
583 SetStr("ZCTYP"+std::to_string((long long int)(fRealColumns.size())), "FACT", "Compression type: FACT");
584
585 return true;
586 }
587
588 /// Get and set the actual number of threads for this object
589 int32_t GetNumThreads() const { return fNumQueues; }
590 bool SetNumThreads(uint32_t num)
591 {
592 if (tellp()>0)
593 {
594#ifdef __EXCEPTIONS
595 throw std::runtime_error("Number of threads cannot be changed in the middle of writing a file");
596#else
597 gLog << ___err___ << "ERROR - Number of threads cannot be changed in the middle of writing a file" << std::endl;
598#endif
599 return false;
600 }
601
602 //get number of physically available threads
603#ifdef HAVE_BOOST_THREAD
604 unsigned int num_available_cores = boost::thread::hardware_concurrency();
605#else
606 unsigned int num_available_cores = std::thread::hardware_concurrency();
607#endif
608 // could not detect number of available cores from system properties...
609 if (num_available_cores == 0)
610 num_available_cores = 1;
611
612 // leave one core for the main thread and one for the writing
613 if (num > num_available_cores)
614 num = num_available_cores>2 ? num_available_cores-2 : 1;
615
616 fCompressionQueues.resize(num<1?1:num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false));
617 fNumQueues = num;
618
619 return true;
620 }
621
622 uint32_t GetNumTiles() const { return fNumTiles; }
623 void SetNumTiles(uint32_t num) { fNumTiles=num; }
624
625protected:
626
627 /// Allocates the required objects.
628 void reallocateBuffers()
629 {
630 const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(FITS::BlockHeader) + sizeof(FITS::TileHeader) + 8; //+8 for checksuming;
631 fMemPool.setChunkSize(chunk_size);
632
633 fSmartBuffer = fMemPool.malloc();
634 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
635 }
636
637 /// Actually does the writing to disk (and checksuming)
638 /// @param src the buffer to write
639 /// @param sizeToWrite how many bytes should be written
640 /// @return the state of the file
641 bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite)
642 {
643 char* checkSumPointer = src+4;
644 int32_t extraBytes = 0;
645 uint32_t sizeToChecksum = sizeToWrite;
646
647 //should we extend the array to the left ?
648 if (fCheckOffset != 0)
649 {
650 sizeToChecksum += fCheckOffset;
651 checkSumPointer -= fCheckOffset;
652 memset(checkSumPointer, 0, fCheckOffset);
653 }
654
655 //should we extend the array to the right ?
656 if (sizeToChecksum%4 != 0)
657 {
658 extraBytes = 4 - (sizeToChecksum%4);
659 memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
660 sizeToChecksum += extraBytes;
661 }
662
663 //do the checksum
664 fDataSum.add(checkSumPointer, sizeToChecksum);
665
666 fCheckOffset = (4 - extraBytes)%4;
667
668 //write data to disk
669 write(src+4, sizeToWrite);
670
671 return good();
672 }
673
674 /// Compress a given buffer based on the target. This is the method executed by the threads
675 /// @param target the struct hosting the parameters of the compression
676 /// @return number of bytes of the compressed data, or always 1 when used by the Queues
677 bool CompressBuffer(const CompressionTarget& target)
678 {
679 //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
680 //Really do not understand what's wrong...
681 //calibrate data if required
682 const uint32_t thisRoundNumRows = (target.num_rows%fNumRowsPerTile) ? target.num_rows%fNumRowsPerTile : fNumRowsPerTile;
683 for (uint32_t i=0;i<thisRoundNumRows;i++)
684 {
685 char* target_location = target.src.get() + fRealRowWidth*i;
686 DrsOffsetCalibrate(target_location);
687 }
688#ifdef __EXCEPTIONS
689 try
690 {
691#endif
692 //transpose the original data
693 copyTransposeTile(target.src.get(), target.transposed_src.get(), target.num_rows);
694
695 //compress the buffer
696 const uint64_t compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
697
698 //post the result to the writing queue
699 //get a copy so that it becomes non-const
700 fWriteToDiskQueue.emplace(target.target, compressed_size);
701
702#ifdef __EXCEPTIONS
703 }
704 catch (...)
705 {
706 fThreadsException = std::current_exception();
707 if (fNumQueues == 0)
708 std::rethrow_exception(fThreadsException);
709 }
710#endif
711
712 return true;
713 }
714
715 /// Write one compressed tile to disk. This is the method executed by the writing thread
716 /// @param target the struct hosting the write parameters
717 bool WriteBufferToDisk(const WriteTarget& target)
718 {
719 //is this the tile we're supposed to write ?
720 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
721 return false;
722
723 fLatestWrittenTile++;
724
725#ifdef __EXCEPTIONS
726 try
727 {
728#endif
729 //could not write the data to disk
730 if (!writeCompressedDataToDisk(target.data.get(), target.size))
731 fErrno = errno;
732#ifdef __EXCEPTIONS
733 }
734 catch (...)
735 {
736 fThreadsException = std::current_exception();
737 if (fNumQueues == 0)
738 std::rethrow_exception(fThreadsException);
739 }
740#endif
741 return true;
742 }
743
744 /// Compress a given buffer based on its source and destination
745 //src cannot be const, as applySMOOTHING is done in place
746 /// @param dest the buffer hosting the compressed data
747 /// @param src the buffer hosting the transposed data
748 /// @param num_rows the number of uncompressed rows in the transposed buffer
749 /// @param the number of bytes of the compressed data
750 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows, CatalogRow& catalog_row)
751 {
752 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
753 uint32_t offset = 0;
754
755 //skip the checksum reserved area
756 dest += 4;
757
758 //skip the 'TILE' marker and tile size entry
759 uint64_t compressedOffset = sizeof(FITS::TileHeader);
760
761 //now compress each column one by one by calling compression on arrays
762 for (uint32_t i=0;i<fRealColumns.size();i++)
763 {
764 catalog_row[i].second = compressedOffset;
765
766 if (fRealColumns[i].col.num == 0)
767 continue;
768
769 FITS::Compression& head = fRealColumns[i].block_head;
770
771 //set the default byte telling if uncompressed the compressed Flag
772 const uint64_t previousOffset = compressedOffset;
773
774 //skip header data
775 compressedOffset += head.getSizeOnDisk();
776
777 for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
778 {
779 switch (head.getProc(j))
780 {
781 case FITS::kFactRaw:
782 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
783 break;
784
785 case FITS::kFactSmoothing:
786 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
787 break;
788
789 case FITS::kFactHuffman16:
790 if (head.getOrdering() == FITS::kOrderByCol)
791 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
792 else
793 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
794 break;
795 }
796 }
797
798 //check if compressed size is larger than uncompressed
799 //if so set flag and redo it uncompressed
800 if ((head.getProc(0) != FITS::kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
801 {
802 //de-smooth !
803 if (head.getProc(0) == FITS::kFactSmoothing)
804 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
805
806 FITS::Compression he;
807
808 compressedOffset = previousOffset + he.getSizeOnDisk();
809 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
810
811 he.SetBlockSize(compressedOffset - previousOffset);
812 he.Memcpy(dest+previousOffset);
813
814 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
815
816 catalog_row[i].first = compressedOffset - catalog_row[i].second;
817 continue;
818 }
819
820 head.SetBlockSize(compressedOffset - previousOffset);
821 head.Memcpy(dest + previousOffset);
822
823 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
824 catalog_row[i].first = compressedOffset - catalog_row[i].second;
825 }
826
827 const FITS::TileHeader tile_head(thisRoundNumRows, compressedOffset);
828 memcpy(dest, &tile_head, sizeof(FITS::TileHeader));
829
830 return compressedOffset;
831 }
832
833 /// Transpose a tile to a new buffer
834 /// @param src buffer hosting the regular, row-ordered data
835 /// @param dest the target buffer that will receive the transposed data
836 void copyTransposeTile(const char* src, char* dest, uint32_t num_rows)
837 {
838 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
839
840 //copy the tile and transpose it
841 for (uint32_t i=0;i<fRealColumns.size();i++)
842 {
843 switch (fRealColumns[i].block_head.getOrdering())
844 {
845 case FITS::kOrderByRow:
846 //regular, "semi-transposed" copy
847 for (uint32_t k=0;k<thisRoundNumRows;k++)
848 {
849 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
850 dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
851 }
852 break;
853
854 case FITS::kOrderByCol:
855 //transposed copy
856 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
857 for (uint32_t k=0;k<thisRoundNumRows;k++)
858 {
859 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
860 dest += fRealColumns[i].col.size;
861 }
862 break;
863 };
864 }
865 }
866
867 /// Specific compression functions
868 /// @param dest the target buffer
869 /// @param src the source buffer
870 /// @param size number of bytes to copy
871 /// @return number of bytes written
872 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
873 {
874 memcpy(dest, src, size);
875 return size;
876 }
877
878 /// Do huffman encoding
879 /// @param dest the buffer that will receive the compressed data
880 /// @param src the buffer hosting the transposed data
881 /// @param numRows number of rows of data in the transposed buffer
882 /// @param sizeOfElems size in bytes of one data elements
883 /// @param numRowElems number of elements on each row
884 /// @return number of bytes written
885 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
886 {
887 std::string huffmanOutput;
888 uint32_t previousHuffmanSize = 0;
889
890 //if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
891 if (numRows < 2)
892 return numRows*sizeOfElems*numRowElems + 1000;
893
894 if (sizeOfElems < 2 )
895 {
896#ifdef __EXCEPTIONS
897 throw std::runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
898#else
899 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << std::endl;
900 return 0;
901#endif
902 }
903
904 uint32_t huffmanOffset = 0;
905 for (uint32_t j=0;j<numRowElems;j++)
906 {
907 Huffman::Encode(huffmanOutput,
908 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
909 numRows*(sizeOfElems/2));
910 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
911 huffmanOffset += sizeof(uint32_t);
912 previousHuffmanSize = huffmanOutput.size();
913 }
914
915 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
916
917 //only copy if not larger than not-compressed size
918 if (totalSize < numRows*sizeOfElems*numRowElems)
919 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
920
921 return totalSize;
922 }
923
924 /// Applies Thomas' DRS4 smoothing
925 /// @param data where to apply it
926 /// @param numElems how many elements of type int16_t are stored in the buffer
927 /// @return number of bytes modified
928 uint32_t applySMOOTHING(char* data, uint32_t numElems)
929 {
930 int16_t* short_data = reinterpret_cast<int16_t*>(data);
931 for (int j=numElems-1;j>1;j--)
932 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
933
934 return numElems*sizeof(int16_t);
935 }
936
937 /// Apply the inverse transform of the integer smoothing
938 /// @param data where to apply it
939 /// @param numElems how many elements of type int16_t are stored in the buffer
940 /// @return number of bytes modified
941 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
942 {
943 int16_t* short_data = reinterpret_cast<int16_t*>(data);
944 for (uint32_t j=2;j<numElems;j++)
945 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
946
947 return numElems*sizeof(uint16_t);
948 }
949
950
951
952 //thread related stuff
953 MemoryManager fMemPool; ///< Actual memory manager, providing memory for the compression buffers
954 int32_t fNumQueues; ///< Current number of threads that will be used by this object
955 uint64_t fMaxUsableMem; ///< Maximum number of bytes that can be allocated by the memory manager
956 int32_t fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads)
957
958 std::vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads)
959 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///< Writing queue (=thread)
960
961 // catalog related stuff
962 CatalogType fCatalog; ///< Catalog for this file
963 uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists)
964 uint32_t fNumTiles; ///< Number of pre-reserved tiles
965 uint32_t fNumRowsPerTile; ///< Number of rows per tile
966 off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file
967
968 // checksum related stuff
969 Checksum fCatalogSum; ///< Checksum of the catalog
970 Checksum fRawSum; ///< Raw sum (specific to FACT)
971 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
972
973 // data layout related stuff
974 /// Regular columns augmented with compression informations
975 struct CompressedColumn
976 {
977 CompressedColumn(const Table::Column& c, const FITS::Compression& h) : col(c),
978 block_head(h)
979 {}
980#ifdef __MARS__ // Needed for the compilation ofthe dictionary
981 CompressedColumn() { }
982#endif
983 Table::Column col; ///< the regular column entry
984 FITS::Compression block_head; ///< the compression data associated with that column
985 };
986 std::vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file
987 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row
988 std::shared_ptr<char> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written
989 std::vector<char> fRawSumBuffer; ///< buffer used for checksuming the incoming data, before compression
990
991#ifdef __EXCEPTIONS
992 std::exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads
993#endif
994 int fErrno; ///< propagate errno to main thread
995};
996
997#endif
Note: See TracBrowser for help on using the repository browser.