source: trunk/Mars/mcore/zofits.h@ 17296

Last change on this file since 17296 was 17296, checked in by lyard, 11 years ago
Gave the possibility to change the number of threads, as long as nothing has been written to the file yet
File size: 38.4 KB
Line 
1#ifndef FACT_zofits
2#define FACT_zofits
3
4/*
5 * zofits.h
6 *
7 * FACT native compressed FITS writer
8 * Author: lyard
9 */
10
11#include "ofits.h"
12#include "huffman.h"
13#include "Queue.h"
14#include "MemoryManager.h"
15
16#ifdef HAVE_BOOST_THREAD
17#include <boost/thread.hpp>
18#endif
19
20class zofits : public ofits
21{
22 /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin
23 template<class S>
24 struct QueueMin : std::list<S>
25 {
26 typename std::list<S>::iterator begin()
27 {
28 return min_element(std::list<S>::begin(), std::list<S>::end());
29 }
30 };
31
32
33 //catalog types
34 struct CatalogEntry
35 {
36 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) { }
37 int64_t first; ///< Size of this column in the tile
38 int64_t second; ///< offset of this column in the tile, from the start of the heap area
39 } __attribute__((__packed__));
40
41 typedef std::vector<CatalogEntry> CatalogRow;
42 typedef std::list<CatalogRow> CatalogType;
43
44
45 /// Parameters required to write a tile to disk
46 struct WriteTarget
47 {
48 bool operator < (const WriteTarget& other) const
49 {
50 return tile_num < other.tile_num;
51 }
52
53 WriteTarget() { }
54 WriteTarget(const WriteTarget &t, uint32_t sz) : tile_num(t.tile_num), size(sz), data(t.data) { }
55
56 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
57 uint32_t size; ///< Size to write
58 std::shared_ptr<char> data; ///< Memory block to write
59 };
60
61
62 /// Parameters required to compress a tile of data
63 struct CompressionTarget
64 {
65 CompressionTarget(CatalogRow& r) : catalog_entry(r)
66 {}
67
68 CatalogRow& catalog_entry; ///< Reference to the catalog entry to deal with
69 std::shared_ptr<char> src; ///< Original data
70 std::shared_ptr<char> transposed_src; ///< Transposed data
71 WriteTarget target; ///< Compressed data
72 uint32_t num_rows; ///< Number of rows to compress
73 };
74
75public:
76 /// static setter for the default number of threads to use. -1 means all available physical cores
77 static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)<-1) n=_n; return n; }
78 static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; }
79 static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; }
80 static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; }
81
82 /// constructors
83 /// @param numTiles how many data groups should be pre-reserved ?
84 /// @param rowPerTile how many rows will be grouped together in a single tile
85 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
86 zofits(uint32_t numTiles = DefaultMaxNumTiles(),
87 uint32_t rowPerTile = DefaultNumRowsPerTile(),
88 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
89 fMemPool(0, maxUsableMem*1000),
90 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
91 {
92 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
93 SetNumThreads(DefaultNumThreads());
94 }
95
96 /// @param fname the target filename
97 /// @param numTiles how many data groups should be pre-reserved ?
98 /// @param rowPerTile how many rows will be grouped together in a single tile
99 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
100 zofits(const char* fname,
101 uint32_t numTiles = DefaultMaxNumTiles(),
102 uint32_t rowPerTile = DefaultNumRowsPerTile(),
103 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(fname),
104 fMemPool(0, maxUsableMem*1000),
105 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
106 {
107 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
108 SetNumThreads(DefaultNumThreads());
109 }
110
111 //initialization of member variables
112 /// @param nt number of tiles
113 /// @param rpt number of rows per tile
114 /// @param maxUsableMem max amount of RAM to be used by the compression buffers
115 void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
116 {
117 fCheckOffset = 0;
118 fNumQueues = 0;
119
120 fNumTiles = nt==0 ? 1 : nt;
121 fNumRowsPerTile = rpt;
122
123 fRealRowWidth = 0;
124 fCatalogOffset = 0;
125 fCatalogSize = 0;
126
127 fMaxUsableMem = maxUsableMem;
128#ifdef __EXCEPTIONS
129 fThreadsException = std::exception_ptr();
130#endif
131 fErrno = 0;
132 }
133
134 /// write the header of the binary table
135 /// @param name the name of the table to be created
136 /// @return the state of the file
137 virtual bool WriteTableHeader(const char* name="DATA")
138 {
139 reallocateBuffers();
140
141 ofits::WriteTableHeader(name);
142
143 fCompressionQueues.front().setPromptExecution(fNumQueues==0);
144 fWriteToDiskQueue.setPromptExecution(fNumQueues==0);
145
146 if (fNumQueues != 0)
147 {
148 //start the compression queues
149 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
150 it->start();
151
152 //start the disk writer
153 fWriteToDiskQueue.start();
154 }
155
156 //mark that no tile has been written so far
157 fLatestWrittenTile = -1;
158
159 //no wiring error (in the writing of the data) has occured so far
160 fErrno = 0;
161
162 return good();
163 }
164
165 /// open a new file.
166 /// @param filename the name of the file
167 /// @param Whether or not the name of the extension should be added or not
168 void open(const char* filename, bool addEXTNAMEKey=true)
169 {
170 ofits::open(filename, addEXTNAMEKey);
171
172 //add compression-related header entries
173 SetBool( "ZTABLE", true, "Table is compressed");
174 SetInt( "ZNAXIS1", 0, "Width of uncompressed rows");
175 SetInt( "ZNAXIS2", 0, "Number of uncompressed rows");
176 SetInt( "ZPCOUNT", 0, "");
177 SetInt( "ZHEAPPTR", 0, "");
178 SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
179 SetInt( "THEAP", 0, "");
180 SetStr( "RAWSUM", " 0", "Checksum of raw little endian data");
181 SetFloat("ZRATIO", 0, "Compression ratio");
182 SetInt( "ZSHRINK", 1, "Catalog shrink factor");
183
184 fCatalogSize = 0;
185 fRealRowWidth = 0;
186 fCatalogOffset = 0;
187 fCatalogSize = 0;
188 fCheckOffset = 0;
189
190 fRealColumns.clear();
191 fCatalog.clear();
192 fCatalogSum.reset();
193 fRawSum.reset();
194 }
195
196 /// Super method. does nothing as zofits does not know about DrsOffsets
197 /// @return the state of the file
198 virtual bool WriteDrsOffsetsTable()
199 {
200 return good();
201 }
202
203 /// Returns the number of bytes per uncompressed row
204 /// @return number of bytes per uncompressed row
205 uint32_t GetBytesPerRow() const
206 {
207 return fRealRowWidth;
208 }
209
210 /// Write the data catalog
211 /// @return the state of the file
212 bool WriteCatalog()
213 {
214 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
215 const uint32_t total_catalog_size = fNumTiles*one_catalog_row_size;
216
217 // swap the catalog bytes before writing
218 std::vector<char> swapped_catalog(total_catalog_size);
219
220 uint32_t shift = 0;
221 for (auto it=fCatalog.cbegin(); it!=fCatalog.cend(); it++)
222 {
223 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
224 shift += one_catalog_row_size;
225 }
226
227 if (fCatalogSize < fNumTiles)
228 memset(swapped_catalog.data()+shift, 0, total_catalog_size-shift);
229
230 // first time writing ? remember where we are
231 if (fCatalogOffset == 0)
232 fCatalogOffset = tellp();
233
234 // remember where we came from
235 const off_t where_are_we = tellp();
236
237 // write to disk
238 seekp(fCatalogOffset);
239 write(swapped_catalog.data(), total_catalog_size);
240
241 if (where_are_we != fCatalogOffset)
242 seekp(where_are_we);
243
244 // udpate checksum
245 fCatalogSum.reset();
246 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
247
248 return good();
249 }
250
251 /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets.
252 virtual void DrsOffsetCalibrate(char* )
253 {
254
255 }
256
257 CatalogRow& AddOneCatalogRow()
258 {
259 // add one row to the catalog
260 fCatalog.emplace_back(CatalogRow());
261 fCatalog.back().resize(fTable.num_cols);
262 for (auto it=fCatalog.back().begin(); it != fCatalog.back().end(); it++)
263 *it = CatalogEntry(0,0);
264
265 fCatalogSize++;
266
267 return fCatalog.back();
268 }
269
270 /// write one row of data
271 /// Note, in a multi-threaded environment (NumThreads>0), the return code should be checked rather
272 /// than the badbit() of the stream (it might have been set by a thread before the errno has been set)
273 /// errno will then contain the correct error number of the last error which happened during writing.
274 /// @param ptr the source buffer
275 /// @param the number of bytes to write
276 /// @return the state of the file. WARNING: with multithreading, this will most likely be the state of the file before the data is actually written
277 bool WriteRow(const void* ptr, size_t cnt, bool = true)
278 {
279 if (cnt != fRealRowWidth)
280 {
281#ifdef __EXCEPTIONS
282 throw std::runtime_error("Wrong size of row given to WriteRow");
283#else
284 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << std::endl;
285 return false;
286#endif
287 }
288
289#ifdef __EXCEPTIONS
290 //check if something hapenned while the compression threads were working
291 //if so, re-throw the exception that was generated
292 if (fThreadsException != std::exception_ptr())
293 std::rethrow_exception(fThreadsException);
294#endif
295
296 //copy current row to pool or rows waiting for compression
297 char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
298 memcpy(target_location, ptr, fRealRowWidth);
299
300 //for now, make an extra copy of the data, for RAWSUM checksuming.
301 //Ideally this should be moved to the threads
302 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
303 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
304 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
305 char* buffer = fRawSumBuffer.data() + rawOffset;
306 auto ib = fRawSumBuffer.begin();
307 auto ie = fRawSumBuffer.rbegin();
308 *ib++ = 0;
309 *ib++ = 0;
310 *ib++ = 0;
311 *ib = 0;
312
313 *ie++ = 0;
314 *ie++ = 0;
315 *ie++ = 0;
316 *ie = 0;
317
318 memcpy(buffer, ptr, fRealRowWidth);
319
320 fRawSum.add(fRawSumBuffer, false);
321
322 fTable.num_rows++;
323
324 if (fTable.num_rows % fNumRowsPerTile != 0)
325 {
326 errno = fErrno;
327 return errno==0;
328 }
329
330 // use the least occupied queue
331 const auto imin = std::min_element(fCompressionQueues.begin(), fCompressionQueues.end());
332
333 if (!imin->emplace(InitNextCompression()))
334 {
335#ifdef __EXCEPTIONS
336 throw std::runtime_error("The compression queues are not started. Did you close the file before writing this row?");
337#else
338 gLog << ___err___ << "The compression queues are not started. Did you close the file before writing this row?" << std::endl;
339 errno = 0;
340 return false;
341#endif
342 }
343
344 errno = fErrno;
345 return errno==0;
346 }
347
348 /// update the real number of rows
349 void FlushNumRows()
350 {
351 SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile);
352 SetInt("ZNAXIS2", fTable.num_rows);
353 FlushHeader();
354 }
355
356 /// Setup the environment to compress yet another tile of data
357 /// @param target the struct where to host the produced parameters
358 CompressionTarget InitNextCompression()
359 {
360 CompressionTarget target(AddOneCatalogRow());
361
362 //fill up compression target
363 target.src = fSmartBuffer;
364 target.transposed_src = fMemPool.malloc();
365 target.num_rows = fTable.num_rows;
366
367 //fill up write to disk target
368 WriteTarget &write_target = target.target;
369 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
370 write_target.size = 0;
371 write_target.data = fMemPool.malloc();
372
373 //get a new buffer to host the incoming data
374 fSmartBuffer = fMemPool.malloc();
375
376 return target;
377 }
378
379 /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file.
380 const uint32_t ShrinkCatalog()
381 {
382 //add empty row to get either the target number of rows, or a multiple of the allowed size
383 for (uint32_t i=0;i<fCatalogSize%fNumTiles;i++)
384 AddOneCatalogRow();
385
386 //did we write more rows than what the catalog could host ?
387 if (fCatalogSize <= fNumTiles) // nothing to do
388 return 1;
389
390 //always exact as extra rows were added just above
391 const uint32_t shrink_factor = fCatalogSize / fNumTiles;
392
393 //shrink the catalog !
394 uint32_t entry_id = 1;
395 auto it = fCatalog.begin();
396 it++;
397 for (; it != fCatalog.end(); it++)
398 {
399 if (entry_id >= fNumTiles)
400 break;
401
402 const uint32_t target_id = entry_id*shrink_factor;
403
404 auto jt = it;
405 for (uint32_t i=0; i<target_id-entry_id; i++)
406 jt++;
407
408 *it = *jt;
409
410 entry_id++;
411 }
412
413 const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
414
415 //remove the too many entries
416 for (uint32_t i=0;i<num_tiles_to_remove;i++)
417 {
418 fCatalog.pop_back();
419 fCatalogSize--;
420 }
421
422 //update header keywords
423 fNumRowsPerTile *= shrink_factor;
424
425 SetInt("ZTILELEN", fNumRowsPerTile);
426 SetInt("ZSHRINK", shrink_factor);
427
428 return shrink_factor;
429 }
430
431 /// close an open file.
432 /// @return the state of the file
433 bool close()
434 {
435 // stop compression and write threads
436 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
437 it->wait();
438
439 fWriteToDiskQueue.wait();
440
441 if (tellp() < 0)
442 return false;
443
444#ifdef __EXCEPTIONS
445 //check if something hapenned while the compression threads were working
446 //if so, re-throw the exception that was generated
447 if (fThreadsException != std::exception_ptr())
448 std::rethrow_exception(fThreadsException);
449#endif
450
451 //write the last tile of data (if any)
452 if (fErrno==0 && fTable.num_rows%fNumRowsPerTile!=0)
453 {
454 fWriteToDiskQueue.enablePromptExecution();
455 fCompressionQueues.front().enablePromptExecution();
456 fCompressionQueues.front().emplace(InitNextCompression());
457 }
458
459 AlignTo2880Bytes();
460
461 int64_t heap_size = 0;
462 int64_t compressed_offset = 0;
463 for (auto it=fCatalog.begin(); it!= fCatalog.end(); it++)
464 {
465 compressed_offset += sizeof(FITS::TileHeader);
466 heap_size += sizeof(FITS::TileHeader);
467 for (uint32_t j=0; j<it->size(); j++)
468 {
469 heap_size += (*it)[j].first;
470 (*it)[j].second = compressed_offset;
471 compressed_offset += (*it)[j].first;
472 if ((*it)[j].first == 0)
473 (*it)[j].second = 0;
474 }
475 }
476
477 const uint32_t shrink_factor = ShrinkCatalog();
478
479 //update header keywords
480 SetInt("ZNAXIS1", fRealRowWidth);
481 SetInt("ZNAXIS2", fTable.num_rows);
482
483 SetInt("ZHEAPPTR", fCatalogSize*fTable.num_cols*sizeof(uint64_t)*2);
484
485 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
486 const uint32_t total_catalog_width = 2*sizeof(int64_t)*fTable.num_cols;
487
488 SetInt("THEAP", total_num_tiles_written*total_catalog_width);
489 SetInt("NAXIS1", total_catalog_width);
490 SetInt("NAXIS2", total_num_tiles_written);
491 SetStr("RAWSUM", std::to_string((long long int)(fRawSum.val())));
492
493 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
494 SetFloat("ZRATIO", compression_ratio);
495
496 //add to the heap size the size of the gap between the catalog and the actual heap
497 heap_size += (fCatalogSize - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
498
499 SetInt("PCOUNT", heap_size, "size of special data area");
500
501 //Just for updating the fCatalogSum value
502 WriteCatalog();
503
504 fDataSum += fCatalogSum;
505
506 const Checksum checksm = UpdateHeaderChecksum();
507
508 std::ofstream::close();
509
510 fSmartBuffer = std::shared_ptr<char>();
511
512 //restore the number of rows per tile in case the catalog has been shrinked
513 if (shrink_factor != 1)
514 fNumRowsPerTile /= shrink_factor;
515
516 if ((checksm+fDataSum).valid())
517 return true;
518
519 std::ostringstream sout;
520 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
521#ifdef __EXCEPTIONS
522 throw std::runtime_error(sout.str());
523#else
524 gLog << ___err___ << "ERROR - " << sout.str() << std::endl;
525 return false;
526#endif
527 }
528
529 /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
530 bool AddColumn(uint32_t cnt, char typechar, const std::string& name, const std::string& unit,
531 const std::string& comment="", bool addHeaderKeys=true)
532 {
533 return AddColumn(FITS::kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
534 }
535
536 /// Overload of the simplified compressed version
537 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const std::string& name,
538 const std::string& unit, const std::string& comment="", bool addHeaderKeys=true)
539 {
540 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
541 return false;
542
543 const size_t size = SizeFromType(typechar);
544
545 Table::Column col;
546 col.name = name;
547 col.type = typechar;
548 col.num = cnt;
549 col.size = size;
550 col.offset = fRealRowWidth;
551
552 fRealRowWidth += size*cnt;
553
554 fRealColumns.emplace_back(col, comp);
555
556 SetStr("ZFORM"+std::to_string((long long int)(fRealColumns.size())), std::to_string((long long int)(cnt))+typechar, "format of "+name+" "+CommentFromType(typechar));
557 SetStr("ZCTYP"+std::to_string((long long int)(fRealColumns.size())), "FACT", "Compression type: FACT");
558
559 return true;
560 }
561
562 /// Get and set the actual number of threads for this object
563 int32_t GetNumThreads() const { return fNumQueues; }
564 bool SetNumThreads(uint32_t num)
565 {
566 if (tellp()>0)
567 {
568#ifdef __EXCEPTIONS
569 throw std::runtime_error("Number of threads cannot be changed in the middle of writing a file");
570#else
571 gLog << ___err___ << "ERROR - Number of threads cannot be changed in the middle of writing a file" << std::endl;
572#endif
573 return false;
574 }
575
576 //get number of physically available threads
577#ifdef HAVE_BOOST_THREAD
578 unsigned int num_available_cores = boost::thread::hardware_concurrency();
579#else
580 unsigned int num_available_cores = std::thread::hardware_concurrency();
581#endif
582 // could not detect number of available cores from system properties...
583 if (num_available_cores == 0)
584 num_available_cores = 1;
585
586 // leave one core for the main thread and one for the writing
587 if (num > num_available_cores)
588 num = num_available_cores>2 ? num_available_cores-2 : 1;
589
590 fCompressionQueues.resize(num<1?1:num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false));
591 fNumQueues = num;
592
593 return true;
594 }
595
596 uint32_t GetNumTiles() const { return fNumTiles; }
597 void SetNumTiles(uint32_t num) { fNumTiles=num; }
598
599protected:
600
601 /// Allocates the required objects.
602 void reallocateBuffers()
603 {
604 const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(FITS::BlockHeader) + sizeof(FITS::TileHeader) + 8; //+8 for checksuming;
605 fMemPool.setChunkSize(chunk_size);
606
607 fSmartBuffer = fMemPool.malloc();
608 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
609 }
610
611 /// Actually does the writing to disk (and checksuming)
612 /// @param src the buffer to write
613 /// @param sizeToWrite how many bytes should be written
614 /// @return the state of the file
615 bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite)
616 {
617 char* checkSumPointer = src+4;
618 int32_t extraBytes = 0;
619 uint32_t sizeToChecksum = sizeToWrite;
620
621 //should we extend the array to the left ?
622 if (fCheckOffset != 0)
623 {
624 sizeToChecksum += fCheckOffset;
625 checkSumPointer -= fCheckOffset;
626 memset(checkSumPointer, 0, fCheckOffset);
627 }
628
629 //should we extend the array to the right ?
630 if (sizeToChecksum%4 != 0)
631 {
632 extraBytes = 4 - (sizeToChecksum%4);
633 memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
634 sizeToChecksum += extraBytes;
635 }
636
637 //do the checksum
638 fDataSum.add(checkSumPointer, sizeToChecksum);
639
640 fCheckOffset = (4 - extraBytes)%4;
641
642 //write data to disk
643 write(src+4, sizeToWrite);
644
645 return good();
646 }
647
648 /// Compress a given buffer based on the target. This is the method executed by the threads
649 /// @param target the struct hosting the parameters of the compression
650 /// @return number of bytes of the compressed data, or always 1 when used by the Queues
651 bool CompressBuffer(const CompressionTarget& target)
652 {
653 //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
654 //Really do not understand what's wrong...
655 //calibrate data if required
656 const uint32_t thisRoundNumRows = (target.num_rows%fNumRowsPerTile) ? target.num_rows%fNumRowsPerTile : fNumRowsPerTile;
657 for (uint32_t i=0;i<thisRoundNumRows;i++)
658 {
659 char* target_location = target.src.get() + fRealRowWidth*i;
660 DrsOffsetCalibrate(target_location);
661 }
662#ifdef __EXCEPTIONS
663 try
664 {
665#endif
666 //transpose the original data
667 copyTransposeTile(target.src.get(), target.transposed_src.get(), target.num_rows);
668
669 //compress the buffer
670 const uint64_t compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
671
672 //post the result to the writing queue
673 //get a copy so that it becomes non-const
674 fWriteToDiskQueue.emplace(target.target, compressed_size);
675
676#ifdef __EXCEPTIONS
677 }
678 catch (...)
679 {
680 fThreadsException = std::current_exception();
681 if (fNumQueues == 0)
682 std::rethrow_exception(fThreadsException);
683 }
684#endif
685
686 return true;
687 }
688
689 /// Write one compressed tile to disk. This is the method executed by the writing thread
690 /// @param target the struct hosting the write parameters
691 bool WriteBufferToDisk(const WriteTarget& target)
692 {
693 //is this the tile we're supposed to write ?
694 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
695 return false;
696
697 fLatestWrittenTile++;
698
699#ifdef __EXCEPTIONS
700 try
701 {
702#endif
703 //could not write the data to disk
704 if (!writeCompressedDataToDisk(target.data.get(), target.size))
705 fErrno = errno;
706#ifdef __EXCEPTIONS
707 }
708 catch (...)
709 {
710 fThreadsException = std::current_exception();
711 if (fNumQueues == 0)
712 std::rethrow_exception(fThreadsException);
713 }
714#endif
715 return true;
716 }
717
718 /// Compress a given buffer based on its source and destination
719 //src cannot be const, as applySMOOTHING is done in place
720 /// @param dest the buffer hosting the compressed data
721 /// @param src the buffer hosting the transposed data
722 /// @param num_rows the number of uncompressed rows in the transposed buffer
723 /// @param the number of bytes of the compressed data
724 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows, CatalogRow& catalog_row)
725 {
726 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
727 uint32_t offset = 0;
728
729 //skip the checksum reserved area
730 dest += 4;
731
732 //skip the 'TILE' marker and tile size entry
733 uint64_t compressedOffset = sizeof(FITS::TileHeader);
734
735 //now compress each column one by one by calling compression on arrays
736 for (uint32_t i=0;i<fRealColumns.size();i++)
737 {
738 catalog_row[i].second = compressedOffset;
739
740 if (fRealColumns[i].col.num == 0)
741 continue;
742
743 FITS::Compression& head = fRealColumns[i].block_head;
744
745 //set the default byte telling if uncompressed the compressed Flag
746 const uint64_t previousOffset = compressedOffset;
747
748 //skip header data
749 compressedOffset += head.getSizeOnDisk();
750
751 for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
752 {
753 switch (head.getProc(j))
754 {
755 case FITS::kFactRaw:
756 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
757 break;
758
759 case FITS::kFactSmoothing:
760 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
761 break;
762
763 case FITS::kFactHuffman16:
764 if (head.getOrdering() == FITS::kOrderByCol)
765 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
766 else
767 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
768 break;
769 }
770 }
771
772 //check if compressed size is larger than uncompressed
773 //if so set flag and redo it uncompressed
774 if ((head.getProc(0) != FITS::kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
775 {
776 //de-smooth !
777 if (head.getProc(0) == FITS::kFactSmoothing)
778 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
779
780 FITS::Compression he;
781
782 compressedOffset = previousOffset + he.getSizeOnDisk();
783 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
784
785 he.SetBlockSize(compressedOffset - previousOffset);
786 he.Memcpy(dest+previousOffset);
787
788 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
789
790 catalog_row[i].first = compressedOffset - catalog_row[i].second;
791 continue;
792 }
793
794 head.SetBlockSize(compressedOffset - previousOffset);
795 head.Memcpy(dest + previousOffset);
796
797 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
798 catalog_row[i].first = compressedOffset - catalog_row[i].second;
799 }
800
801 const FITS::TileHeader tile_head(thisRoundNumRows, compressedOffset);
802 memcpy(dest, &tile_head, sizeof(FITS::TileHeader));
803
804 return compressedOffset;
805 }
806
807 /// Transpose a tile to a new buffer
808 /// @param src buffer hosting the regular, row-ordered data
809 /// @param dest the target buffer that will receive the transposed data
810 void copyTransposeTile(const char* src, char* dest, uint32_t num_rows)
811 {
812 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
813
814 //copy the tile and transpose it
815 for (uint32_t i=0;i<fRealColumns.size();i++)
816 {
817 switch (fRealColumns[i].block_head.getOrdering())
818 {
819 case FITS::kOrderByRow:
820 //regular, "semi-transposed" copy
821 for (uint32_t k=0;k<thisRoundNumRows;k++)
822 {
823 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
824 dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
825 }
826 break;
827
828 case FITS::kOrderByCol:
829 //transposed copy
830 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
831 for (uint32_t k=0;k<thisRoundNumRows;k++)
832 {
833 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
834 dest += fRealColumns[i].col.size;
835 }
836 break;
837 };
838 }
839 }
840
841 /// Specific compression functions
842 /// @param dest the target buffer
843 /// @param src the source buffer
844 /// @param size number of bytes to copy
845 /// @return number of bytes written
846 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
847 {
848 memcpy(dest, src, size);
849 return size;
850 }
851
852 /// Do huffman encoding
853 /// @param dest the buffer that will receive the compressed data
854 /// @param src the buffer hosting the transposed data
855 /// @param numRows number of rows of data in the transposed buffer
856 /// @param sizeOfElems size in bytes of one data elements
857 /// @param numRowElems number of elements on each row
858 /// @return number of bytes written
859 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
860 {
861 std::string huffmanOutput;
862 uint32_t previousHuffmanSize = 0;
863
864 //if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
865 if (numRows < 2)
866 return numRows*sizeOfElems*numRowElems + 1000;
867
868 if (sizeOfElems < 2 )
869 {
870#ifdef __EXCEPTIONS
871 throw std::runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
872#else
873 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << std::endl;
874 return 0;
875#endif
876 }
877
878 uint32_t huffmanOffset = 0;
879 for (uint32_t j=0;j<numRowElems;j++)
880 {
881 Huffman::Encode(huffmanOutput,
882 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
883 numRows*(sizeOfElems/2));
884 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
885 huffmanOffset += sizeof(uint32_t);
886 previousHuffmanSize = huffmanOutput.size();
887 }
888
889 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
890
891 //only copy if not larger than not-compressed size
892 if (totalSize < numRows*sizeOfElems*numRowElems)
893 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
894
895 return totalSize;
896 }
897
898 /// Applies Thomas' DRS4 smoothing
899 /// @param data where to apply it
900 /// @param numElems how many elements of type int16_t are stored in the buffer
901 /// @return number of bytes modified
902 uint32_t applySMOOTHING(char* data, uint32_t numElems)
903 {
904 int16_t* short_data = reinterpret_cast<int16_t*>(data);
905 for (int j=numElems-1;j>1;j--)
906 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
907
908 return numElems*sizeof(int16_t);
909 }
910
911 /// Apply the inverse transform of the integer smoothing
912 /// @param data where to apply it
913 /// @param numElems how many elements of type int16_t are stored in the buffer
914 /// @return number of bytes modified
915 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
916 {
917 int16_t* short_data = reinterpret_cast<int16_t*>(data);
918 for (uint32_t j=2;j<numElems;j++)
919 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
920
921 return numElems*sizeof(uint16_t);
922 }
923
924
925
926 //thread related stuff
927 MemoryManager fMemPool; ///< Actual memory manager, providing memory for the compression buffers
928 int32_t fNumQueues; ///< Current number of threads that will be used by this object
929 uint64_t fMaxUsableMem; ///< Maximum number of bytes that can be allocated by the memory manager
930 int32_t fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads)
931
932 std::vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads)
933 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///< Writing queue (=thread)
934
935 // catalog related stuff
936 CatalogType fCatalog; ///< Catalog for this file
937 uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists)
938 uint32_t fNumTiles; ///< Number of pre-reserved tiles
939 uint32_t fNumRowsPerTile; ///< Number of rows per tile
940 off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file
941
942 // checksum related stuff
943 Checksum fCatalogSum; ///< Checksum of the catalog
944 Checksum fRawSum; ///< Raw sum (specific to FACT)
945 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
946
947 // data layout related stuff
948 /// Regular columns augmented with compression informations
949 struct CompressedColumn
950 {
951 CompressedColumn(const Table::Column& c, const FITS::Compression& h) : col(c),
952 block_head(h)
953 {}
954 Table::Column col; ///< the regular column entry
955 FITS::Compression block_head; ///< the compression data associated with that column
956 };
957 std::vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file
958 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row
959 std::shared_ptr<char> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written
960 std::vector<char> fRawSumBuffer; ///< buffer used for checksuming the incoming data, before compression
961
962#ifdef __EXCEPTIONS
963 std::exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads
964#endif
965 int fErrno; ///< propagate errno to main thread
966};
967
968#endif
Note: See TracBrowser for help on using the repository browser.