source: trunk/Mars/mcore/zofits.h@ 17267

Last change on this file since 17267 was 17264, checked in by tbretz, 11 years ago
Hopefully finished removing the std namespace from the headers.
File size: 39.2 KB
Line 
1#ifndef FACT_zofits
2#define FACT_zofits
3
4/*
5 * zofits.h
6 *
7 * FACT native compressed FITS writer
8 * Author: lyard
9 */
10
11#include "ofits.h"
12#include "huffman.h"
13#include "Queue.h"
14#include "MemoryManager.h"
15
16#ifdef USE_BOOST_THREADS
17#include <boost/thread.hpp>
18#endif
19
20class zofits : public ofits
21{
22 /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin
23 template<class S>
24 struct QueueMin : std::list<S>
25 {
26 typename std::list<S>::iterator begin()
27 {
28 return min_element(std::list<S>::begin(), std::list<S>::end());
29 }
30 };
31
32
33 //catalog types
34 struct CatalogEntry
35 {
36 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) { }
37 int64_t first; ///< Size of this column in the tile
38 int64_t second; ///< offset of this column in the tile, from the start of the heap area
39 } __attribute__((__packed__));
40
41 typedef std::vector<CatalogEntry> CatalogRow;
42 typedef std::list<CatalogRow> CatalogType;
43
44
45 /// Parameters required to write a tile to disk
46 struct WriteTarget
47 {
48 bool operator < (const WriteTarget& other)
49 {
50 return tile_num < other.tile_num;
51 }
52
53 WriteTarget() { }
54 WriteTarget(const WriteTarget &t, uint32_t sz) : tile_num(t.tile_num), size(sz), data(t.data) { }
55
56 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
57 uint32_t size; ///< Size to write
58 std::shared_ptr<char> data; ///< Memory block to write
59 };
60
61
62 /// Parameters required to compress a tile of data
63 struct CompressionTarget
64 {
65 CompressionTarget(CatalogRow& r) : catalog_entry(r)
66 {}
67
68 CatalogRow& catalog_entry; ///< Reference to the catalog entry to deal with
69 std::shared_ptr<char> src; ///< Original data
70 std::shared_ptr<char> transposed_src; ///< Transposed data
71 WriteTarget target; ///< Compressed data
72 uint32_t num_rows; ///< Number of rows to compress
73 };
74
75public:
76 /// static setter for the default number of threads to use. -1 means all available physical cores
77 static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)<-1) n=_n; return n; }
78 static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; }
79 static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; }
80 static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; }
81
82 /// constructors
83 /// @param numTiles how many data groups should be pre-reserved ?
84 /// @param rowPerTile how many rows will be grouped together in a single tile
85 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
86 zofits(uint32_t numTiles = DefaultMaxNumTiles(),
87 uint32_t rowPerTile = DefaultNumRowsPerTile(),
88 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
89 fMemPool(0, maxUsableMem*1000),
90 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
91 {
92 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
93 SetNumThreads(DefaultNumThreads());
94 }
95
96 /// @param fname the target filename
97 /// @param numTiles how many data groups should be pre-reserved ?
98 /// @param rowPerTile how many rows will be grouped together in a single tile
99 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
100 zofits(const char* fname,
101 uint32_t numTiles = DefaultMaxNumTiles(),
102 uint32_t rowPerTile = DefaultNumRowsPerTile(),
103 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(fname),
104 fMemPool(0, maxUsableMem*1000),
105 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
106 {
107 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
108 SetNumThreads(DefaultNumThreads());
109 }
110
111 //initialization of member variables
112 /// @param nt number of tiles
113 /// @param rpt number of rows per tile
114 /// @param maxUsableMem max amount of RAM to be used by the compression buffers
115 void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
116 {
117 fCheckOffset = 0;
118 fNumQueues = 0;
119
120 fNumTiles = nt==0 ? 1 : nt;
121 fNumRowsPerTile = rpt;
122
123 fRealRowWidth = 0;
124 fCatalogOffset = 0;
125 fCatalogSize = 0;
126
127 fMaxUsableMem = maxUsableMem;
128#ifdef __EXCEPTIONS
129 fThreadsException = std::exception_ptr();
130#endif
131 fErrno = 0;
132 }
133
134 /// write the header of the binary table
135 /// @param name the name of the table to be created
136 /// @return the state of the file
137 virtual bool WriteTableHeader(const char* name="DATA")
138 {
139 reallocateBuffers();
140
141 ofits::WriteTableHeader(name);
142
143 if (fNumQueues != 0)
144 {
145 //start the compression queues
146 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
147 it->start();
148
149 //start the disk writer
150 fWriteToDiskQueue.start();
151 }
152
153 //mark that no tile has been written so far
154 fLatestWrittenTile = -1;
155
156 return good();
157 }
158
159 /// open a new file.
160 /// @param filename the name of the file
161 /// @param Whether or not the name of the extension should be added or not
162 void open(const char* filename, bool addEXTNAMEKey=true)
163 {
164 ofits::open(filename, addEXTNAMEKey);
165
166 //add compression-related header entries
167 SetBool( "ZTABLE", true, "Table is compressed");
168 SetInt( "ZNAXIS1", 0, "Width of uncompressed rows");
169 SetInt( "ZNAXIS2", 0, "Number of uncompressed rows");
170 SetInt( "ZPCOUNT", 0, "");
171 SetInt( "ZHEAPPTR", 0, "");
172 SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
173 SetInt( "THEAP", 0, "");
174 SetStr( "RAWSUM", " 0", "Checksum of raw little endian data");
175 SetFloat("ZRATIO", 0, "Compression ratio");
176 SetInt( "ZSHRINK", 1, "Catalog shrink factor");
177
178 fCatalogSize = 0;
179 fCatalog.clear();
180 fRawSum.reset();
181 }
182
183 /// Super method. does nothing as zofits does not know about DrsOffsets
184 /// @return the state of the file
185 virtual bool WriteDrsOffsetsTable()
186 {
187 return good();
188 }
189
190 /// Returns the number of bytes per uncompressed row
191 /// @return number of bytes per uncompressed row
192 uint32_t GetBytesPerRow() const
193 {
194 return fRealRowWidth;
195 }
196
197 /// Write the data catalog
198 /// @return the state of the file
199 bool WriteCatalog()
200 {
201 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
202 const uint32_t total_catalog_size = fNumTiles*one_catalog_row_size;
203
204 // swap the catalog bytes before writing
205 std::vector<char> swapped_catalog(total_catalog_size);
206
207 uint32_t shift = 0;
208 for (auto it=fCatalog.cbegin(); it!=fCatalog.cend(); it++)
209 {
210 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
211 shift += one_catalog_row_size;
212 }
213
214 if (fCatalogSize < fNumTiles)
215 memset(swapped_catalog.data()+shift, 0, total_catalog_size-shift);
216
217 // first time writing ? remember where we are
218 if (fCatalogOffset == 0)
219 fCatalogOffset = tellp();
220
221 // remember where we came from
222 const off_t where_are_we = tellp();
223
224 // write to disk
225 seekp(fCatalogOffset);
226 write(swapped_catalog.data(), total_catalog_size);
227
228 if (where_are_we != fCatalogOffset)
229 seekp(where_are_we);
230
231 // udpate checksum
232 fCatalogSum.reset();
233 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
234
235 return good();
236 }
237
238 /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets.
239 virtual void DrsOffsetCalibrate(char* )
240 {
241
242 }
243
244 CatalogRow& AddOneCatalogRow()
245 {
246 // add one row to the catalog
247 fCatalog.emplace_back(CatalogRow());
248 fCatalog.back().resize(fTable.num_cols);
249 for (auto it=fCatalog.back().begin(); it != fCatalog.back().end(); it++)
250 *it = CatalogEntry(0,0);
251
252 fCatalogSize++;
253
254 return fCatalog.back();
255 }
256
257 /// write one row of data
258 /// Note, in a multi-threaded environment (NumThreads>0), the return code should be checked rather
259 /// than the badbit() of the stream (it might have been set by a thread before the errno has been set)
260 /// errno will then contain the correct error number of the last error which happened during writing.
261 /// @param ptr the source buffer
262 /// @param the number of bytes to write
263 /// @return the state of the file. WARNING: with multithreading, this will most likely be the state of the file before the data is actually written
264 bool WriteRow(const void* ptr, size_t cnt, bool = true)
265 {
266 if (cnt != fRealRowWidth)
267 {
268#ifdef __EXCEPTIONS
269 throw std::runtime_error("Wrong size of row given to WriteRow");
270#else
271 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << std::endl;
272 return false;
273#endif
274 }
275
276#ifdef __EXCEPTIONS
277 //check if something hapenned while the compression threads were working
278 //if so, re-throw the exception that was generated
279 if (fThreadsException != std::exception_ptr())
280 std::rethrow_exception(fThreadsException);
281#endif
282 //copy current row to pool or rows waiting for compression
283 char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
284 memcpy(target_location, ptr, fRealRowWidth);
285
286 //for now, make an extra copy of the data, for RAWSUM checksuming.
287 //Ideally this should be moved to the threads
288 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
289 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
290 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
291 char* buffer = fRawSumBuffer.data() + rawOffset;
292 auto ib = fRawSumBuffer.begin();
293 auto ie = fRawSumBuffer.rbegin();
294 *ib++ = 0;
295 *ib++ = 0;
296 *ib++ = 0;
297 *ib = 0;
298
299 *ie++ = 0;
300 *ie++ = 0;
301 *ie++ = 0;
302 *ie = 0;
303
304 memcpy(buffer, ptr, fRealRowWidth);
305
306 fRawSum.add(fRawSumBuffer, false);
307
308 fTable.num_rows++;
309
310 DrsOffsetCalibrate(target_location);
311
312 if (fTable.num_rows % fNumRowsPerTile != 0)
313 {
314 errno = fErrno;
315 return errno==0;
316 }
317
318 CompressionTarget compress_target(AddOneCatalogRow());
319 SetNextCompression(compress_target);
320
321 //no worker threads. do everything in-line
322 if (fNumQueues == 0)
323 {
324 const uint64_t size_to_write = CompressBuffer(compress_target);
325
326 const WriteTarget write_target(compress_target.target, size_to_write);
327 if (!WriteBufferToDisk(write_target))
328 throw std::runtime_error("Unexpected tile number mismatch in WriteBufferToDisk in the main thread.");
329
330 // The correct 'errno' is set, because it is the main thread.
331 return good();
332 }
333
334 //if all queues are empty, use queue 0
335 uint32_t min_index = 0;
336 uint32_t min_size = std::numeric_limits<uint32_t>::max();
337 uint32_t current_index = 0;
338
339 for (auto it=fCompressionQueues.cbegin(); it!=fCompressionQueues.cend(); it++)
340 {
341 if (it->size() < min_size)
342 {
343 min_index = current_index;
344 min_size = it->size();
345 }
346 current_index++;
347 }
348
349 if (!fCompressionQueues[min_index].emplace(compress_target))
350 throw std::runtime_error("The compression queues are not started. Did you close the file before writing this row?");
351
352 errno = fErrno;
353 return errno==0;
354 }
355
356 /// update the real number of rows
357 void FlushNumRows()
358 {
359 SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile);
360 SetInt("ZNAXIS2", fTable.num_rows);
361 FlushHeader();
362 }
363
364 /// Setup the environment to compress yet another tile of data
365 /// @param target the struct where to host the produced parameters
366 void SetNextCompression(CompressionTarget& target)
367 {
368 //fill up compression target
369 target.src = fSmartBuffer;
370 target.transposed_src = fMemPool.malloc();
371 target.num_rows = fTable.num_rows;
372
373 //fill up write to disk target
374 WriteTarget &write_target = target.target;
375 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
376 write_target.size = 0;
377 write_target.data = fMemPool.malloc();
378
379 //get a new buffer to host the incoming data
380 fSmartBuffer = fMemPool.malloc();
381 }
382
383 /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file.
384 void ShrinkCatalog()
385 {
386 //add empty row to get either the target number of rows, or a multiple of the allowed size
387 for (uint32_t i=0;i<fCatalogSize%fNumTiles;i++)
388 AddOneCatalogRow();
389
390 //did we write more rows than what the catalog could host ?
391 if (fCatalogSize > fNumTiles)
392 {
393 const uint32_t shrink_factor = fCatalogSize / fNumTiles; //always exact as extra rows were added just above
394
395 //shrink the catalog !
396 uint32_t entry_id = 1;
397 auto it = fCatalog.begin();
398 it++;
399 for (; it != fCatalog.end(); it++)
400 {
401 if (entry_id >= fNumTiles) break;
402
403 uint32_t target_id = entry_id*shrink_factor;
404
405 auto jt = it;
406 for (uint32_t i=0;i<target_id-entry_id;i++)
407 jt++;
408
409 *it = *jt;
410
411 entry_id++;
412 }
413
414 const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
415 //remove the too many entries
416 for (uint32_t i=0;i<num_tiles_to_remove;i++)
417 {
418 fCatalog.pop_back();
419 fCatalogSize--;
420 }
421 //update header keywords
422 fNumRowsPerTile *= shrink_factor;
423
424 SetInt("ZTILELEN", fNumRowsPerTile);
425 SetInt("ZSHRINK", shrink_factor);
426 }
427 }
428
429 /// close an open file.
430 /// @return the state of the file
431 bool close()
432 {
433 // stop compression and write threads
434 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
435 it->wait();
436
437 fWriteToDiskQueue.wait();
438
439 if (tellp() < 0)
440 return false;
441
442#ifdef __EXCEPTIONS
443 //check if something hapenned while the compression threads were working
444 //if so, re-throw the exception that was generated
445 if (fThreadsException != std::exception_ptr())
446 std::rethrow_exception(fThreadsException);
447#endif
448
449 //write the last tile of data (if any
450 if (fTable.num_rows%fNumRowsPerTile != 0)
451 {
452 CompressionTarget compress_target(AddOneCatalogRow());
453 SetNextCompression(compress_target);
454
455 //set number of threads to zero before calling compressBuffer
456 const int32_t backup_num_queues = fNumQueues;
457 fNumQueues = 0;
458
459 const uint64_t size_to_write = CompressBuffer(compress_target);
460 fNumQueues = backup_num_queues;
461
462 const WriteTarget write_target(compress_target.target, size_to_write);
463 if (!WriteBufferToDisk(write_target))
464 throw std::runtime_error("Tile number mismatch in WriteBufferToDisk writing the last tile.");
465 }
466
467 AlignTo2880Bytes();
468
469 int64_t heap_size = 0;
470 int64_t compressed_offset = 0;
471 for (auto it=fCatalog.begin(); it!= fCatalog.end(); it++)
472 {
473 compressed_offset += sizeof(FITS::TileHeader);
474 heap_size += sizeof(FITS::TileHeader);
475 for (uint32_t j=0; j<it->size(); j++)
476 {
477 heap_size += (*it)[j].first;
478 (*it)[j].second = compressed_offset;
479 compressed_offset += (*it)[j].first;
480 if ((*it)[j].first == 0)
481 (*it)[j].second = 0;
482 }
483 }
484
485 ShrinkCatalog();
486
487 //update header keywords
488 SetInt("ZNAXIS1", fRealRowWidth);
489 SetInt("ZNAXIS2", fTable.num_rows);
490
491 SetInt("ZHEAPPTR", fCatalogSize*fTable.num_cols*sizeof(uint64_t)*2);
492
493 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
494 const uint32_t total_catalog_width = 2*sizeof(int64_t)*fTable.num_cols;
495
496 SetInt("THEAP", total_num_tiles_written*total_catalog_width);
497 SetInt("NAXIS1", total_catalog_width);
498 SetInt("NAXIS2", total_num_tiles_written);
499 SetStr("RAWSUM", std::to_string(fRawSum.val()));
500
501 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
502 SetFloat("ZRATIO", compression_ratio);
503
504 //add to the heap size the size of the gap between the catalog and the actual heap
505 heap_size += (fCatalogSize - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
506
507 SetInt("PCOUNT", heap_size, "size of special data area");
508
509 //Just for updating the fCatalogSum value
510 WriteCatalog();
511
512 fDataSum += fCatalogSum;
513
514 const Checksum checksm = UpdateHeaderChecksum();
515
516 std::ofstream::close();
517
518 if ((checksm+fDataSum).valid())
519 return true;
520
521 std::ostringstream sout;
522 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
523#ifdef __EXCEPTIONS
524 throw std::runtime_error(sout.str());
525#else
526 gLog << ___err___ << "ERROR - " << sout.str() << std::endl;
527 return false;
528#endif
529 }
530
531 /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
532 bool AddColumn(uint32_t cnt, char typechar, const std::string& name, const std::string& unit,
533 const std::string& comment="", bool addHeaderKeys=true)
534 {
535 return AddColumn(FITS::kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
536 }
537
538 /// Overload of the simplified compressed version
539 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const std::string& name,
540 const std::string& unit, const std::string& comment="", bool addHeaderKeys=true)
541 {
542 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
543 return false;
544
545 const size_t size = SizeFromType(typechar);
546
547 Table::Column col;
548 col.name = name;
549 col.type = typechar;
550 col.num = cnt;
551 col.size = size;
552 col.offset = fRealRowWidth;
553
554 fRealRowWidth += size*cnt;
555
556 fRealColumns.emplace_back(col, comp);
557
558 SetStr("ZFORM"+std::to_string(fRealColumns.size()), std::to_string(cnt)+typechar, "format of "+name+" "+CommentFromType(typechar));
559 SetStr("ZCTYP"+std::to_string(fRealColumns.size()), "FACT", "Compression type: FACT");
560
561 return true;
562 }
563
564 /// Get and set the actual number of threads for this object
565 int32_t GetNumThreads() const { return fNumQueues;}
566 bool SetNumThreads(uint32_t num)
567 {
568 if (is_open())
569 {
570#ifdef __EXCEPTIONS
571 throw std::runtime_error("File must be closed before changing the number of compression threads");
572#else
573 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads" << std::endl;
574#endif
575 return false;
576 }
577
578 //get number of physically available threads
579#ifdef USE_BOOST_THREADS
580 unsigned int num_available_cores = boost::thread::hardware_concurrency();
581#else
582 unsigned int num_available_cores = std::thread::hardware_concurrency();
583#endif
584 // could not detect number of available cores from system properties...
585 if (num_available_cores == 0)
586 num_available_cores = 1;
587
588 // leave one core for the main thread and one for the writing
589 if (num > num_available_cores)
590 num = num_available_cores>2 ? num_available_cores-2 : 1;
591
592 if (fCompressionQueues.size() != uint32_t(num))
593 {
594 fCompressionQueues.resize(num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false));
595 fNumQueues = num;
596 }
597
598 return true;
599 }
600
601protected:
602
603 /// Allocates the required objects.
604 void reallocateBuffers()
605 {
606 const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(FITS::BlockHeader) + sizeof(FITS::TileHeader) + 8; //+8 for checksuming;
607 fMemPool.setChunkSize(chunk_size);
608
609 fSmartBuffer = fMemPool.malloc();
610 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
611 }
612
613 /// Actually does the writing to disk (and checksuming)
614 /// @param src the buffer to write
615 /// @param sizeToWrite how many bytes should be written
616 /// @return the state of the file
617 bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite)
618 {
619 char* checkSumPointer = src+4;
620 int32_t extraBytes = 0;
621 uint32_t sizeToChecksum = sizeToWrite;
622
623 //should we extend the array to the left ?
624 if (fCheckOffset != 0)
625 {
626 sizeToChecksum += fCheckOffset;
627 checkSumPointer -= fCheckOffset;
628 memset(checkSumPointer, 0, fCheckOffset);
629 }
630
631 //should we extend the array to the right ?
632 if (sizeToChecksum%4 != 0)
633 {
634 extraBytes = 4 - (sizeToChecksum%4);
635 memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
636 sizeToChecksum += extraBytes;
637 }
638
639 //do the checksum
640 fDataSum.add(checkSumPointer, sizeToChecksum);
641
642 fCheckOffset = (4 - extraBytes)%4;
643
644 //write data to disk
645 write(src+4, sizeToWrite);
646
647 return good();
648 }
649
650 /// Compress a given buffer based on the target. This is the method executed by the threads
651 /// @param target the struct hosting the parameters of the compression
652 /// @return number of bytes of the compressed data, or always 1 when used by the Queues
653 uint32_t CompressBuffer(const CompressionTarget& target)
654 {
655 uint64_t compressed_size = 0;
656
657 //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
658 //Really do not understand what's wrong...
659 //calibrate data if required
660 //const uint32_t thisRoundNumRows = (target.num_rows%fNumRowsPerTile) ? target.num_rows%fNumRowsPerTile : fNumRowsPerTile;
661// for (uint32_t i=0;i<thisRoundNumRows;i++)
662// {
663// char* target_location = target.src.get() + fRealRowWidth*i;
664// cout << "Target Location there...." << hex << static_cast<void*>(target_location) << endl;
665// DrsOffsetCalibrate(target_location);
666// }
667
668#ifdef __EXCEPTIONS
669 try
670 {
671#endif
672 //transpose the original data
673 copyTransposeTile(target.src.get(), target.transposed_src.get());
674
675 //compress the buffer
676 compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
677#ifdef __EXCEPTIONS
678 }
679 catch (...)
680 {
681 fThreadsException = std::current_exception();
682 if (fNumQueues == 0)
683 std::rethrow_exception(fThreadsException);
684 }
685#endif
686
687 if (fNumQueues == 0)
688 return compressed_size;
689
690 //post the result to the writing queue
691 //get a copy so that it becomes non-const
692 fWriteToDiskQueue.emplace(target.target, compressed_size);
693
694 // if used by the queue, always return true as the elements are not ordered
695 return 1;
696 }
697
698 /// Write one compressed tile to disk. This is the method executed by the writing thread
699 /// @param target the struct hosting the write parameters
700 bool WriteBufferToDisk(const WriteTarget& target)
701 {
702 //is this the tile we're supposed to write ?
703 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
704 return false;
705
706 fLatestWrittenTile++;
707
708#ifdef __EXCEPTIONS
709 try
710 {
711#endif
712 //could not write the data to disk
713 if (!writeCompressedDataToDisk(target.data.get(), target.size))
714 fErrno = errno;
715#ifdef __EXCEPTIONS
716 }
717 catch (...)
718 {
719 fThreadsException = std::current_exception();
720 if (fNumQueues == 0)
721 std::rethrow_exception(fThreadsException);
722 }
723#endif
724 return true;
725 }
726
727 /// Compress a given buffer based on its source and destination
728 //src cannot be const, as applySMOOTHING is done in place
729 /// @param dest the buffer hosting the compressed data
730 /// @param src the buffer hosting the transposed data
731 /// @param num_rows the number of uncompressed rows in the transposed buffer
732 /// @param the number of bytes of the compressed data
733 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows, CatalogRow& catalog_row)
734 {
735 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
736 uint32_t offset = 0;
737
738 //skip the checksum reserved area
739 dest += 4;
740
741 //skip the 'TILE' marker and tile size entry
742 uint64_t compressedOffset = sizeof(FITS::TileHeader);
743
744 //now compress each column one by one by calling compression on arrays
745 for (uint32_t i=0;i<fRealColumns.size();i++)
746 {
747 catalog_row[i].second = compressedOffset;
748
749 if (fRealColumns[i].col.num == 0)
750 continue;
751
752 FITS::Compression& head = fRealColumns[i].block_head;
753
754 //set the default byte telling if uncompressed the compressed Flag
755 const uint64_t previousOffset = compressedOffset;
756
757 //skip header data
758 compressedOffset += head.getSizeOnDisk();
759
760 for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
761 {
762 switch (head.getProc(j))
763 {
764 case FITS::kFactRaw:
765 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
766 break;
767
768 case FITS::kFactSmoothing:
769 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
770 break;
771
772 case FITS::kFactHuffman16:
773 if (head.getOrdering() == FITS::kOrderByCol)
774 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
775 else
776 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
777 break;
778 }
779 }
780
781 //check if compressed size is larger than uncompressed
782 //if so set flag and redo it uncompressed
783 if ((head.getProc(0) != FITS::kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
784 {
785 //de-smooth !
786 if (head.getProc(0) == FITS::kFactSmoothing)
787 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
788
789 FITS::Compression he;
790
791 compressedOffset = previousOffset + he.getSizeOnDisk();
792 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
793
794 he.SetBlockSize(compressedOffset - previousOffset);
795 he.Memcpy(dest+previousOffset);
796
797 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
798
799 catalog_row[i].first = compressedOffset - catalog_row[i].second;
800 continue;
801 }
802
803 head.SetBlockSize(compressedOffset - previousOffset);
804 head.Memcpy(dest + previousOffset);
805
806 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
807 catalog_row[i].first = compressedOffset - catalog_row[i].second;
808 }
809
810 const FITS::TileHeader tile_head(thisRoundNumRows, compressedOffset);
811 memcpy(dest, &tile_head, sizeof(FITS::TileHeader));
812
813 return compressedOffset;
814 }
815
816 /// Transpose a tile to a new buffer
817 /// @param src buffer hosting the regular, row-ordered data
818 /// @param dest the target buffer that will receive the transposed data
819 void copyTransposeTile(const char* src, char* dest)
820 {
821 const uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile;
822
823 //copy the tile and transpose it
824 for (uint32_t i=0;i<fRealColumns.size();i++)
825 {
826 switch (fRealColumns[i].block_head.getOrdering())
827 {
828 case FITS::kOrderByRow:
829 //regular, "semi-transposed" copy
830 for (uint32_t k=0;k<thisRoundNumRows;k++)
831 {
832 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
833 dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
834 }
835 break;
836
837 case FITS::kOrderByCol:
838 //transposed copy
839 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
840 for (uint32_t k=0;k<thisRoundNumRows;k++)
841 {
842 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
843 dest += fRealColumns[i].col.size;
844 }
845 break;
846 };
847 }
848 }
849
850 /// Specific compression functions
851 /// @param dest the target buffer
852 /// @param src the source buffer
853 /// @param size number of bytes to copy
854 /// @return number of bytes written
855 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
856 {
857 memcpy(dest, src, size);
858 return size;
859 }
860
861 /// Do huffman encoding
862 /// @param dest the buffer that will receive the compressed data
863 /// @param src the buffer hosting the transposed data
864 /// @param numRows number of rows of data in the transposed buffer
865 /// @param sizeOfElems size in bytes of one data elements
866 /// @param numRowElems number of elements on each row
867 /// @return number of bytes written
868 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
869 {
870 std::string huffmanOutput;
871 uint32_t previousHuffmanSize = 0;
872
873 //if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
874 if (numRows < 2)
875 return numRows*sizeOfElems*numRowElems + 1000;
876
877 if (sizeOfElems < 2 )
878 {
879#ifdef __EXCEPTIONS
880 throw std::runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
881#else
882 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << std::endl;
883 return 0;
884#endif
885 }
886
887 uint32_t huffmanOffset = 0;
888 for (uint32_t j=0;j<numRowElems;j++)
889 {
890 Huffman::Encode(huffmanOutput,
891 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
892 numRows*(sizeOfElems/2));
893 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
894 huffmanOffset += sizeof(uint32_t);
895 previousHuffmanSize = huffmanOutput.size();
896 }
897
898 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
899
900 //only copy if not larger than not-compressed size
901 if (totalSize < numRows*sizeOfElems*numRowElems)
902 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
903
904 return totalSize;
905 }
906
907 /// Applies Thomas' DRS4 smoothing
908 /// @param data where to apply it
909 /// @param numElems how many elements of type int16_t are stored in the buffer
910 /// @return number of bytes modified
911 uint32_t applySMOOTHING(char* data, uint32_t numElems)
912 {
913 int16_t* short_data = reinterpret_cast<int16_t*>(data);
914 for (int j=numElems-1;j>1;j--)
915 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
916
917 return numElems*sizeof(int16_t);
918 }
919
920 /// Apply the inverse transform of the integer smoothing
921 /// @param data where to apply it
922 /// @param numElems how many elements of type int16_t are stored in the buffer
923 /// @return number of bytes modified
924 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
925 {
926 int16_t* short_data = reinterpret_cast<int16_t*>(data);
927 for (uint32_t j=2;j<numElems;j++)
928 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
929
930 return numElems*sizeof(uint16_t);
931 }
932
933
934
935 //thread related stuff
936 MemoryManager fMemPool; ///< Actual memory manager, providing memory for the compression buffers
937 int32_t fNumQueues; ///< Current number of threads that will be used by this object
938 uint64_t fMaxUsableMem; ///< Maximum number of bytes that can be allocated by the memory manager
939 int32_t fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads)
940
941 std::vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads)
942 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///< Writing queue (=thread)
943
944 // catalog related stuff
945 CatalogType fCatalog; ///< Catalog for this file
946 uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists)
947 uint32_t fNumTiles; ///< Number of pre-reserved tiles
948 uint32_t fNumRowsPerTile; ///< Number of rows per tile
949 off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file
950
951 // checksum related stuff
952 Checksum fCatalogSum; ///< Checksum of the catalog
953 Checksum fRawSum; ///< Raw sum (specific to FACT)
954 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
955
956 // data layout related stuff
957 /// Regular columns augmented with compression informations
958 struct CompressedColumn
959 {
960 CompressedColumn(const Table::Column& c, const FITS::Compression& h) : col(c),
961 block_head(h)
962 {}
963 Table::Column col; ///< the regular column entry
964 FITS::Compression block_head; ///< the compression data associated with that column
965 };
966 std::vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file
967 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row
968 std::shared_ptr<char> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written
969 std::vector<char> fRawSumBuffer; ///< buffer used for checksuming the incoming data, before compression
970
971#ifdef __EXCEPTIONS
972 std::exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads
973#endif
974 int fErrno; ///< propagate errno to main thread
975};
976
977#endif
Note: See TracBrowser for help on using the repository browser.