source: trunk/Mars/mcore/zofits.h@ 17761

Last change on this file since 17761 was 17754, checked in by tbretz, 11 years ago
There were some casts missing to allow large memory sizes
File size: 39.6 KB
Line 
1#ifndef FACT_zofits
2#define FACT_zofits
3
4/*
5 * zofits.h
6 *
7 * FACT native compressed FITS writer
8 * Author: lyard
9 */
10
11#include "ofits.h"
12#include "huffman.h"
13#include "Queue.h"
14#include "MemoryManager.h"
15
16#ifdef HAVE_BOOST_THREAD
17#include <boost/thread.hpp>
18#endif
19
20class zofits : public ofits
21{
22#ifdef __MARS__ // Needed by CINT to access the structures
23public:
24#endif
25 /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin
26 template<class S>
27 struct QueueMin : std::list<S>
28 {
29 typename std::list<S>::iterator begin()
30 {
31 return min_element(std::list<S>::begin(), std::list<S>::end());
32 }
33 };
34
35#ifdef __CINT__
36 // CINT doesn't like the packed attribute...
37 // Therefore we give another hint of the size of the structure
38 struct CatalogEntry { char dummy[16]; };
39#else
40 //catalog types
41 struct CatalogEntry
42 {
43 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) { }
44 int64_t first; ///< Size of this column in the tile
45 int64_t second; ///< offset of this column in the tile, from the start of the heap area
46 } __attribute__((__packed__));
47#endif
48
49 typedef std::vector<CatalogEntry> CatalogRow;
50 typedef std::list<CatalogRow> CatalogType;
51
52 /// Parameters required to write a tile to disk
53 struct WriteTarget
54 {
55 bool operator < (const WriteTarget& other) const
56 {
57 return tile_num < other.tile_num;
58 }
59
60 WriteTarget() { }
61 WriteTarget(const WriteTarget &t, uint32_t sz) : tile_num(t.tile_num), size(sz), data(t.data) { }
62
63 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
64 uint32_t size; ///< Size to write
65 std::shared_ptr<char> data; ///< Memory block to write
66 };
67
68
69 /// Parameters required to compress a tile of data
70 struct CompressionTarget
71 {
72 CompressionTarget(CatalogRow& r) : catalog_entry(r)
73 {}
74
75 CatalogRow& catalog_entry; ///< Reference to the catalog entry to deal with
76 std::shared_ptr<char> src; ///< Original data
77 std::shared_ptr<char> transposed_src; ///< Transposed data
78 WriteTarget target; ///< Compressed data
79 uint32_t num_rows; ///< Number of rows to compress
80 };
81
82public:
83 /// static setter for the default number of threads to use. -1 means all available physical cores
84 static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)>=-1) n=_n; return n; }
85 static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; }
86 static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; }
87 static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; }
88
89 /// constructors
90 /// @param numTiles how many data groups should be pre-reserved ?
91 /// @param rowPerTile how many rows will be grouped together in a single tile
92 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
93 zofits(uint32_t numTiles = DefaultMaxNumTiles(),
94 uint32_t rowPerTile = DefaultNumRowsPerTile(),
95 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
96 fMemPool(0, size_t(maxUsableMem)*1000),
97 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
98 {
99 InitMemberVariables(numTiles, rowPerTile, size_t(maxUsableMem)*1000);
100 SetNumThreads(DefaultNumThreads());
101 }
102
103 /// @param fname the target filename
104 /// @param numTiles how many data groups should be pre-reserved ?
105 /// @param rowPerTile how many rows will be grouped together in a single tile
106 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
107 zofits(const char* fname,
108 uint32_t numTiles = DefaultMaxNumTiles(),
109 uint32_t rowPerTile = DefaultNumRowsPerTile(),
110 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
111 fMemPool(0, size_t(maxUsableMem)*1000),
112 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
113 {
114 open(fname);
115 InitMemberVariables(numTiles, rowPerTile, size_t(maxUsableMem)*1000);
116 SetNumThreads(DefaultNumThreads());
117 }
118
119 zofits(const std::string &fname,
120 uint32_t numTiles = DefaultMaxNumTiles(),
121 uint32_t rowPerTile = DefaultNumRowsPerTile(),
122 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
123 fMemPool(0, size_t(maxUsableMem)*1000),
124 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
125 {
126 open(fname);
127 InitMemberVariables(numTiles, rowPerTile, size_t(maxUsableMem)*1000);
128 SetNumThreads(DefaultNumThreads());
129 }
130
131 //initialization of member variables
132 /// @param nt number of tiles
133 /// @param rpt number of rows per tile
134 /// @param maxUsableMem max amount of RAM to be used by the compression buffers
135 void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
136 {
137 fCheckOffset = 0;
138 fNumQueues = 0;
139
140 fNumTiles = nt==0 ? 1 : nt;
141 fNumRowsPerTile = rpt;
142
143 fRealRowWidth = 0;
144 fCatalogOffset = 0;
145 fCatalogSize = 0;
146
147 fMaxUsableMem = maxUsableMem;
148#ifdef __EXCEPTIONS
149 fThreadsException = std::exception_ptr();
150#endif
151 fErrno = 0;
152 }
153
154 /// write the header of the binary table
155 /// @param name the name of the table to be created
156 /// @return the state of the file
157 virtual bool WriteTableHeader(const char* name="DATA")
158 {
159 reallocateBuffers();
160
161 SetInt("ZNAXIS1", fRealRowWidth);
162
163 ofits::WriteTableHeader(name);
164
165 fCompressionQueues.front().setPromptExecution(fNumQueues==0);
166 fWriteToDiskQueue.setPromptExecution(fNumQueues==0);
167
168 if (fNumQueues != 0)
169 {
170 //start the compression queues
171 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
172 it->start();
173
174 //start the disk writer
175 fWriteToDiskQueue.start();
176 }
177
178 //mark that no tile has been written so far
179 fLatestWrittenTile = -1;
180
181 //no wiring error (in the writing of the data) has occured so far
182 fErrno = 0;
183
184 return good();
185 }
186
187 /// open a new file.
188 /// @param filename the name of the file
189 /// @param Whether or not the name of the extension should be added or not
190 void open(const char* filename, bool addEXTNAMEKey=true)
191 {
192 ofits::open(filename, addEXTNAMEKey);
193
194 //add compression-related header entries
195 SetBool( "ZTABLE", true, "Table is compressed");
196 SetInt( "ZNAXIS1", 0, "Width of uncompressed rows");
197 SetInt( "ZNAXIS2", 0, "Number of uncompressed rows");
198 SetInt( "ZPCOUNT", 0, "");
199 SetInt( "ZHEAPPTR", 0, "");
200 SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
201 SetInt( "THEAP", 0, "");
202 SetStr( "RAWSUM", " 0", "Checksum of raw little endian data");
203 SetFloat("ZRATIO", 0, "Compression ratio");
204 SetInt( "ZSHRINK", 1, "Catalog shrink factor");
205
206 fCatalogSize = 0;
207 fRealRowWidth = 0;
208 fCatalogOffset = 0;
209 fCatalogSize = 0;
210 fCheckOffset = 0;
211
212 fRealColumns.clear();
213 fCatalog.clear();
214 fCatalogSum.reset();
215 fRawSum.reset();
216 }
217
218 void open(const std::string &filename, bool addEXTNAMEKey=true)
219 {
220 open(filename.c_str(), addEXTNAMEKey);
221 }
222
223 /// Super method. does nothing as zofits does not know about DrsOffsets
224 /// @return the state of the file
225 virtual bool WriteDrsOffsetsTable()
226 {
227 return good();
228 }
229
230 /// Returns the number of bytes per uncompressed row
231 /// @return number of bytes per uncompressed row
232 uint32_t GetBytesPerRow() const
233 {
234 return fRealRowWidth;
235 }
236
237 /// Write the data catalog
238 /// @return the state of the file
239 bool WriteCatalog()
240 {
241 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
242 const uint32_t total_catalog_size = fNumTiles*one_catalog_row_size;
243
244 // swap the catalog bytes before writing
245 std::vector<char> swapped_catalog(total_catalog_size);
246
247 uint32_t shift = 0;
248 for (auto it=fCatalog.cbegin(); it!=fCatalog.cend(); it++)
249 {
250 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
251 shift += one_catalog_row_size;
252 }
253
254 if (fCatalogSize < fNumTiles)
255 memset(swapped_catalog.data()+shift, 0, total_catalog_size-shift);
256
257 // first time writing ? remember where we are
258 if (fCatalogOffset == 0)
259 fCatalogOffset = tellp();
260
261 // remember where we came from
262 const off_t where_are_we = tellp();
263
264 // write to disk
265 seekp(fCatalogOffset);
266 write(swapped_catalog.data(), total_catalog_size);
267
268 if (where_are_we != fCatalogOffset)
269 seekp(where_are_we);
270
271 // udpate checksum
272 fCatalogSum.reset();
273 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
274
275 return good();
276 }
277
278 /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets.
279 virtual void DrsOffsetCalibrate(char* )
280 {
281
282 }
283
284 CatalogRow& AddOneCatalogRow()
285 {
286 // add one row to the catalog
287 fCatalog.emplace_back(CatalogRow());
288 fCatalog.back().resize(fTable.num_cols);
289 for (auto it=fCatalog.back().begin(); it != fCatalog.back().end(); it++)
290 *it = CatalogEntry(0,0);
291
292 fCatalogSize++;
293
294 return fCatalog.back();
295 }
296
297 /// write one row of data
298 /// Note, in a multi-threaded environment (NumThreads>0), the return code should be checked rather
299 /// than the badbit() of the stream (it might have been set by a thread before the errno has been set)
300 /// errno will then contain the correct error number of the last error which happened during writing.
301 /// @param ptr the source buffer
302 /// @param the number of bytes to write
303 /// @return the state of the file. WARNING: with multithreading, this will most likely be the state of the file before the data is actually written
304 bool WriteRow(const void* ptr, size_t cnt, bool = true)
305 {
306 if (cnt != fRealRowWidth)
307 {
308#ifdef __EXCEPTIONS
309 throw std::runtime_error("Wrong size of row given to WriteRow");
310#else
311 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << std::endl;
312 return false;
313#endif
314 }
315
316#ifdef __EXCEPTIONS
317 //check if something hapenned while the compression threads were working
318 //if so, re-throw the exception that was generated
319 if (fThreadsException != std::exception_ptr())
320 std::rethrow_exception(fThreadsException);
321#endif
322
323 //copy current row to pool or rows waiting for compression
324 char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
325 memcpy(target_location, ptr, fRealRowWidth);
326
327 //for now, make an extra copy of the data, for RAWSUM checksuming.
328 //Ideally this should be moved to the threads
329 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
330 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
331 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
332 char* buffer = fRawSumBuffer.data() + rawOffset;
333 auto ib = fRawSumBuffer.begin();
334 auto ie = fRawSumBuffer.rbegin();
335 *ib++ = 0;
336 *ib++ = 0;
337 *ib++ = 0;
338 *ib = 0;
339
340 *ie++ = 0;
341 *ie++ = 0;
342 *ie++ = 0;
343 *ie = 0;
344
345 memcpy(buffer, ptr, fRealRowWidth);
346
347 fRawSum.add(fRawSumBuffer, false);
348
349 fTable.num_rows++;
350
351 if (fTable.num_rows % fNumRowsPerTile != 0)
352 {
353 errno = fErrno;
354 return errno==0;
355 }
356
357 // use the least occupied queue
358 const auto imin = std::min_element(fCompressionQueues.begin(), fCompressionQueues.end());
359
360 if (!imin->emplace(InitNextCompression()))
361 {
362#ifdef __EXCEPTIONS
363 throw std::runtime_error("The compression queues are not started. Did you close the file before writing this row?");
364#else
365 gLog << ___err___ << "The compression queues are not started. Did you close the file before writing this row?" << std::endl;
366 errno = 0;
367 return false;
368#endif
369 }
370
371 errno = fErrno;
372 return errno==0;
373 }
374
375 /// update the real number of rows
376 void FlushNumRows()
377 {
378 SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile);
379 SetInt("ZNAXIS2", fTable.num_rows);
380 FlushHeader();
381 }
382
383 /// Setup the environment to compress yet another tile of data
384 /// @param target the struct where to host the produced parameters
385 CompressionTarget InitNextCompression()
386 {
387 CompressionTarget target(AddOneCatalogRow());
388
389 //fill up compression target
390 target.src = fSmartBuffer;
391 target.transposed_src = fMemPool.malloc();
392 target.num_rows = fTable.num_rows;
393
394 //fill up write to disk target
395 WriteTarget &write_target = target.target;
396 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
397 write_target.size = 0;
398 write_target.data = fMemPool.malloc();
399
400 //get a new buffer to host the incoming data
401 fSmartBuffer = fMemPool.malloc();
402
403 return target;
404 }
405
406 /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file.
407 uint32_t ShrinkCatalog()
408 {
409 //add empty row to get either the target number of rows, or a multiple of the allowed size
410 for (uint32_t i=0;i<fCatalogSize%fNumTiles;i++)
411 AddOneCatalogRow();
412
413 //did we write more rows than what the catalog could host ?
414 if (fCatalogSize <= fNumTiles) // nothing to do
415 return 1;
416
417 //always exact as extra rows were added just above
418 const uint32_t shrink_factor = fCatalogSize / fNumTiles;
419
420 //shrink the catalog !
421 uint32_t entry_id = 1;
422 auto it = fCatalog.begin();
423 it++;
424 for (; it != fCatalog.end(); it++)
425 {
426 if (entry_id >= fNumTiles)
427 break;
428
429 const uint32_t target_id = entry_id*shrink_factor;
430
431 auto jt = it;
432 for (uint32_t i=0; i<target_id-entry_id; i++)
433 jt++;
434
435 *it = *jt;
436
437 entry_id++;
438 }
439
440 const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
441
442 //remove the too many entries
443 for (uint32_t i=0;i<num_tiles_to_remove;i++)
444 {
445 fCatalog.pop_back();
446 fCatalogSize--;
447 }
448
449 //update header keywords
450 fNumRowsPerTile *= shrink_factor;
451
452 SetInt("ZTILELEN", fNumRowsPerTile);
453 SetInt("ZSHRINK", shrink_factor);
454
455 return shrink_factor;
456 }
457
458 /// close an open file.
459 /// @return the state of the file
460 bool close()
461 {
462 // stop compression and write threads
463 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
464 it->wait();
465
466 fWriteToDiskQueue.wait();
467
468 if (tellp() < 0)
469 return false;
470
471#ifdef __EXCEPTIONS
472 //check if something hapenned while the compression threads were working
473 //if so, re-throw the exception that was generated
474 if (fThreadsException != std::exception_ptr())
475 std::rethrow_exception(fThreadsException);
476#endif
477
478 //write the last tile of data (if any)
479 if (fErrno==0 && fTable.num_rows%fNumRowsPerTile!=0)
480 {
481 fWriteToDiskQueue.enablePromptExecution();
482 fCompressionQueues.front().enablePromptExecution();
483 fCompressionQueues.front().emplace(InitNextCompression());
484 }
485
486 AlignTo2880Bytes();
487
488 int64_t heap_size = 0;
489 int64_t compressed_offset = 0;
490 for (auto it=fCatalog.begin(); it!= fCatalog.end(); it++)
491 {
492 compressed_offset += sizeof(FITS::TileHeader);
493 heap_size += sizeof(FITS::TileHeader);
494 for (uint32_t j=0; j<it->size(); j++)
495 {
496 heap_size += (*it)[j].first;
497 (*it)[j].second = compressed_offset;
498 compressed_offset += (*it)[j].first;
499 if ((*it)[j].first == 0)
500 (*it)[j].second = 0;
501 }
502 }
503
504 const uint32_t shrink_factor = ShrinkCatalog();
505
506 //update header keywords
507 SetInt("ZNAXIS1", fRealRowWidth);
508 SetInt("ZNAXIS2", fTable.num_rows);
509
510 SetInt("ZHEAPPTR", fCatalogSize*fTable.num_cols*sizeof(uint64_t)*2);
511
512 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
513 const uint32_t total_catalog_width = 2*sizeof(int64_t)*fTable.num_cols;
514
515 SetInt("THEAP", total_num_tiles_written*total_catalog_width);
516 SetInt("NAXIS1", total_catalog_width);
517 SetInt("NAXIS2", total_num_tiles_written);
518 SetStr("RAWSUM", std::to_string((long long int)(fRawSum.val())));
519
520 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
521 SetFloat("ZRATIO", compression_ratio);
522
523 //add to the heap size the size of the gap between the catalog and the actual heap
524 heap_size += (fCatalogSize - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
525
526 SetInt("PCOUNT", heap_size, "size of special data area");
527
528 //Just for updating the fCatalogSum value
529 WriteCatalog();
530
531 fDataSum += fCatalogSum;
532
533 const Checksum checksm = UpdateHeaderChecksum();
534
535 if (!fFilebuf.close())
536 setstate(ios_base::failbit);
537
538 fSmartBuffer = std::shared_ptr<char>();
539
540 //restore the number of rows per tile in case the catalog has been shrinked
541 if (shrink_factor != 1)
542 fNumRowsPerTile /= shrink_factor;
543
544 if ((checksm+fDataSum).valid())
545 return true;
546
547 std::ostringstream sout;
548 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
549#ifdef __EXCEPTIONS
550 throw std::runtime_error(sout.str());
551#else
552 gLog << ___err___ << "ERROR - " << sout.str() << std::endl;
553 return false;
554#endif
555 }
556
557 /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
558 bool AddColumn(uint32_t cnt, char typechar, const std::string& name, const std::string& unit,
559 const std::string& comment="", bool addHeaderKeys=true)
560 {
561 return AddColumn(FITS::kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
562 }
563
564 /// Overload of the simplified compressed version
565 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const std::string& name,
566 const std::string& unit, const std::string& comment="", bool addHeaderKeys=true)
567 {
568 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
569 return false;
570
571 const size_t size = SizeFromType(typechar);
572
573 Table::Column col;
574 col.name = name;
575 col.type = typechar;
576 col.num = cnt;
577 col.size = size;
578 col.offset = fRealRowWidth;
579
580 fRealRowWidth += size*cnt;
581
582 fRealColumns.emplace_back(col, comp);
583
584 SetStr("ZFORM"+std::to_string((long long int)(fRealColumns.size())), std::to_string((long long int)(cnt))+typechar, "format of "+name+" "+CommentFromType(typechar));
585 SetStr("ZCTYP"+std::to_string((long long int)(fRealColumns.size())), "FACT", "Compression type: FACT");
586
587 return true;
588 }
589
590 /// Get and set the actual number of threads for this object
591 int32_t GetNumThreads() const { return fNumQueues; }
592 bool SetNumThreads(uint32_t num)
593 {
594 if (tellp()>0)
595 {
596#ifdef __EXCEPTIONS
597 throw std::runtime_error("Number of threads cannot be changed in the middle of writing a file");
598#else
599 gLog << ___err___ << "ERROR - Number of threads cannot be changed in the middle of writing a file" << std::endl;
600#endif
601 return false;
602 }
603
604 //get number of physically available threads
605#ifdef HAVE_BOOST_THREAD
606 unsigned int num_available_cores = boost::thread::hardware_concurrency();
607#else
608 unsigned int num_available_cores = std::thread::hardware_concurrency();
609#endif
610 // could not detect number of available cores from system properties...
611 if (num_available_cores == 0)
612 num_available_cores = 1;
613
614 // leave one core for the main thread and one for the writing
615 if (num > num_available_cores)
616 num = num_available_cores>2 ? num_available_cores-2 : 1;
617
618 fCompressionQueues.resize(num<1?1:num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false));
619 fNumQueues = num;
620
621 return true;
622 }
623
624 uint32_t GetNumTiles() const { return fNumTiles; }
625 void SetNumTiles(uint32_t num) { fNumTiles=num; }
626
627protected:
628
629 /// Allocates the required objects.
630 void reallocateBuffers()
631 {
632 const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(FITS::BlockHeader) + sizeof(FITS::TileHeader) + 8; //+8 for checksuming;
633 fMemPool.setChunkSize(chunk_size);
634
635 fSmartBuffer = fMemPool.malloc();
636 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
637 }
638
639 /// Actually does the writing to disk (and checksuming)
640 /// @param src the buffer to write
641 /// @param sizeToWrite how many bytes should be written
642 /// @return the state of the file
643 bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite)
644 {
645 char* checkSumPointer = src+4;
646 int32_t extraBytes = 0;
647 uint32_t sizeToChecksum = sizeToWrite;
648
649 //should we extend the array to the left ?
650 if (fCheckOffset != 0)
651 {
652 sizeToChecksum += fCheckOffset;
653 checkSumPointer -= fCheckOffset;
654 memset(checkSumPointer, 0, fCheckOffset);
655 }
656
657 //should we extend the array to the right ?
658 if (sizeToChecksum%4 != 0)
659 {
660 extraBytes = 4 - (sizeToChecksum%4);
661 memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
662 sizeToChecksum += extraBytes;
663 }
664
665 //do the checksum
666 fDataSum.add(checkSumPointer, sizeToChecksum);
667
668 fCheckOffset = (4 - extraBytes)%4;
669
670 //write data to disk
671 write(src+4, sizeToWrite);
672
673 return good();
674 }
675
676 /// Compress a given buffer based on the target. This is the method executed by the threads
677 /// @param target the struct hosting the parameters of the compression
678 /// @return number of bytes of the compressed data, or always 1 when used by the Queues
679 bool CompressBuffer(const CompressionTarget& target)
680 {
681 //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
682 //Really do not understand what's wrong...
683 //calibrate data if required
684 const uint32_t thisRoundNumRows = (target.num_rows%fNumRowsPerTile) ? target.num_rows%fNumRowsPerTile : fNumRowsPerTile;
685 for (uint32_t i=0;i<thisRoundNumRows;i++)
686 {
687 char* target_location = target.src.get() + fRealRowWidth*i;
688 DrsOffsetCalibrate(target_location);
689 }
690#ifdef __EXCEPTIONS
691 try
692 {
693#endif
694 //transpose the original data
695 copyTransposeTile(target.src.get(), target.transposed_src.get(), target.num_rows);
696
697 //compress the buffer
698 const uint64_t compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
699
700 //post the result to the writing queue
701 //get a copy so that it becomes non-const
702 fWriteToDiskQueue.emplace(target.target, compressed_size);
703
704#ifdef __EXCEPTIONS
705 }
706 catch (...)
707 {
708 fThreadsException = std::current_exception();
709 if (fNumQueues == 0)
710 std::rethrow_exception(fThreadsException);
711 }
712#endif
713
714 return true;
715 }
716
717 /// Write one compressed tile to disk. This is the method executed by the writing thread
718 /// @param target the struct hosting the write parameters
719 bool WriteBufferToDisk(const WriteTarget& target)
720 {
721 //is this the tile we're supposed to write ?
722 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
723 return false;
724
725 fLatestWrittenTile++;
726
727#ifdef __EXCEPTIONS
728 try
729 {
730#endif
731 //could not write the data to disk
732 if (!writeCompressedDataToDisk(target.data.get(), target.size))
733 fErrno = errno;
734#ifdef __EXCEPTIONS
735 }
736 catch (...)
737 {
738 fThreadsException = std::current_exception();
739 if (fNumQueues == 0)
740 std::rethrow_exception(fThreadsException);
741 }
742#endif
743 return true;
744 }
745
746 /// Compress a given buffer based on its source and destination
747 //src cannot be const, as applySMOOTHING is done in place
748 /// @param dest the buffer hosting the compressed data
749 /// @param src the buffer hosting the transposed data
750 /// @param num_rows the number of uncompressed rows in the transposed buffer
751 /// @param the number of bytes of the compressed data
752 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows, CatalogRow& catalog_row)
753 {
754 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
755 uint32_t offset = 0;
756
757 //skip the checksum reserved area
758 dest += 4;
759
760 //skip the 'TILE' marker and tile size entry
761 uint64_t compressedOffset = sizeof(FITS::TileHeader);
762
763 //now compress each column one by one by calling compression on arrays
764 for (uint32_t i=0;i<fRealColumns.size();i++)
765 {
766 catalog_row[i].second = compressedOffset;
767
768 if (fRealColumns[i].col.num == 0)
769 continue;
770
771 FITS::Compression& head = fRealColumns[i].block_head;
772
773 //set the default byte telling if uncompressed the compressed Flag
774 const uint64_t previousOffset = compressedOffset;
775
776 //skip header data
777 compressedOffset += head.getSizeOnDisk();
778
779 for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
780 {
781 switch (head.getProc(j))
782 {
783 case FITS::kFactRaw:
784 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
785 break;
786
787 case FITS::kFactSmoothing:
788 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
789 break;
790
791 case FITS::kFactHuffman16:
792 if (head.getOrdering() == FITS::kOrderByCol)
793 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
794 else
795 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
796 break;
797 }
798 }
799
800 //check if compressed size is larger than uncompressed
801 //if so set flag and redo it uncompressed
802 if ((head.getProc(0) != FITS::kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
803 {
804 //de-smooth !
805 if (head.getProc(0) == FITS::kFactSmoothing)
806 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
807
808 FITS::Compression he;
809
810 compressedOffset = previousOffset + he.getSizeOnDisk();
811 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
812
813 he.SetBlockSize(compressedOffset - previousOffset);
814 he.Memcpy(dest+previousOffset);
815
816 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
817
818 catalog_row[i].first = compressedOffset - catalog_row[i].second;
819 continue;
820 }
821
822 head.SetBlockSize(compressedOffset - previousOffset);
823 head.Memcpy(dest + previousOffset);
824
825 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
826 catalog_row[i].first = compressedOffset - catalog_row[i].second;
827 }
828
829 const FITS::TileHeader tile_head(thisRoundNumRows, compressedOffset);
830 memcpy(dest, &tile_head, sizeof(FITS::TileHeader));
831
832 return compressedOffset;
833 }
834
835 /// Transpose a tile to a new buffer
836 /// @param src buffer hosting the regular, row-ordered data
837 /// @param dest the target buffer that will receive the transposed data
838 void copyTransposeTile(const char* src, char* dest, uint32_t num_rows)
839 {
840 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
841
842 //copy the tile and transpose it
843 for (uint32_t i=0;i<fRealColumns.size();i++)
844 {
845 switch (fRealColumns[i].block_head.getOrdering())
846 {
847 case FITS::kOrderByRow:
848 //regular, "semi-transposed" copy
849 for (uint32_t k=0;k<thisRoundNumRows;k++)
850 {
851 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
852 dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
853 }
854 break;
855
856 case FITS::kOrderByCol:
857 //transposed copy
858 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
859 for (uint32_t k=0;k<thisRoundNumRows;k++)
860 {
861 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
862 dest += fRealColumns[i].col.size;
863 }
864 break;
865 };
866 }
867 }
868
869 /// Specific compression functions
870 /// @param dest the target buffer
871 /// @param src the source buffer
872 /// @param size number of bytes to copy
873 /// @return number of bytes written
874 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
875 {
876 memcpy(dest, src, size);
877 return size;
878 }
879
880 /// Do huffman encoding
881 /// @param dest the buffer that will receive the compressed data
882 /// @param src the buffer hosting the transposed data
883 /// @param numRows number of rows of data in the transposed buffer
884 /// @param sizeOfElems size in bytes of one data elements
885 /// @param numRowElems number of elements on each row
886 /// @return number of bytes written
887 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
888 {
889 std::string huffmanOutput;
890 uint32_t previousHuffmanSize = 0;
891
892 //if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
893 if (numRows < 2)
894 return numRows*sizeOfElems*numRowElems + 1000;
895
896 if (sizeOfElems < 2 )
897 {
898#ifdef __EXCEPTIONS
899 throw std::runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
900#else
901 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << std::endl;
902 return 0;
903#endif
904 }
905
906 uint32_t huffmanOffset = 0;
907 for (uint32_t j=0;j<numRowElems;j++)
908 {
909 Huffman::Encode(huffmanOutput,
910 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
911 numRows*(sizeOfElems/2));
912 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
913 huffmanOffset += sizeof(uint32_t);
914 previousHuffmanSize = huffmanOutput.size();
915 }
916
917 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
918
919 //only copy if not larger than not-compressed size
920 if (totalSize < numRows*sizeOfElems*numRowElems)
921 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
922
923 return totalSize;
924 }
925
926 /// Applies Thomas' DRS4 smoothing
927 /// @param data where to apply it
928 /// @param numElems how many elements of type int16_t are stored in the buffer
929 /// @return number of bytes modified
930 uint32_t applySMOOTHING(char* data, uint32_t numElems)
931 {
932 int16_t* short_data = reinterpret_cast<int16_t*>(data);
933 for (int j=numElems-1;j>1;j--)
934 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
935
936 return numElems*sizeof(int16_t);
937 }
938
939 /// Apply the inverse transform of the integer smoothing
940 /// @param data where to apply it
941 /// @param numElems how many elements of type int16_t are stored in the buffer
942 /// @return number of bytes modified
943 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
944 {
945 int16_t* short_data = reinterpret_cast<int16_t*>(data);
946 for (uint32_t j=2;j<numElems;j++)
947 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
948
949 return numElems*sizeof(uint16_t);
950 }
951
952
953
954 //thread related stuff
955 MemoryManager fMemPool; ///< Actual memory manager, providing memory for the compression buffers
956 int32_t fNumQueues; ///< Current number of threads that will be used by this object
957 uint64_t fMaxUsableMem; ///< Maximum number of bytes that can be allocated by the memory manager
958 int32_t fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads)
959
960 std::vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads)
961 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///< Writing queue (=thread)
962
963 // catalog related stuff
964 CatalogType fCatalog; ///< Catalog for this file
965 uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists)
966 uint32_t fNumTiles; ///< Number of pre-reserved tiles
967 uint32_t fNumRowsPerTile; ///< Number of rows per tile
968 off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file
969
970 // checksum related stuff
971 Checksum fCatalogSum; ///< Checksum of the catalog
972 Checksum fRawSum; ///< Raw sum (specific to FACT)
973 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
974
975 // data layout related stuff
976 /// Regular columns augmented with compression informations
977 struct CompressedColumn
978 {
979 CompressedColumn(const Table::Column& c, const FITS::Compression& h) : col(c),
980 block_head(h)
981 {}
982#ifdef __MARS__ // Needed for the compilation ofthe dictionary
983 CompressedColumn() { }
984#endif
985 Table::Column col; ///< the regular column entry
986 FITS::Compression block_head; ///< the compression data associated with that column
987 };
988 std::vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file
989 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row
990 std::shared_ptr<char> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written
991 std::vector<char> fRawSumBuffer; ///< buffer used for checksuming the incoming data, before compression
992
993#ifdef __EXCEPTIONS
994 std::exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads
995#endif
996 int fErrno; ///< propagate errno to main thread
997};
998
999#endif
Note: See TracBrowser for help on using the repository browser.