source: trunk/Mars/mcore/zofits.h@ 17752

Last change on this file since 17752 was 17752, checked in by tbretz, 11 years ago
The condition was wrong... it was not possible to properly set the number of threads to be used and the default was all instead of none.
File size: 39.5 KB
Line 
1#ifndef FACT_zofits
2#define FACT_zofits
3
4/*
5 * zofits.h
6 *
7 * FACT native compressed FITS writer
8 * Author: lyard
9 */
10
11#include "ofits.h"
12#include "huffman.h"
13#include "Queue.h"
14#include "MemoryManager.h"
15
16#ifdef HAVE_BOOST_THREAD
17#include <boost/thread.hpp>
18#endif
19
20class zofits : public ofits
21{
22#ifdef __MARS__ // Needed by CINT to access the structures
23public:
24#endif
25 /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin
26 template<class S>
27 struct QueueMin : std::list<S>
28 {
29 typename std::list<S>::iterator begin()
30 {
31 return min_element(std::list<S>::begin(), std::list<S>::end());
32 }
33 };
34
35#ifdef __CINT__
36 struct CatalogEntry;
37#else
38 //catalog types
39 struct CatalogEntry
40 {
41 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) { }
42 int64_t first; ///< Size of this column in the tile
43 int64_t second; ///< offset of this column in the tile, from the start of the heap area
44 } __attribute__((__packed__));
45#endif
46
47 typedef std::vector<CatalogEntry> CatalogRow;
48 typedef std::list<CatalogRow> CatalogType;
49
50 /// Parameters required to write a tile to disk
51 struct WriteTarget
52 {
53 bool operator < (const WriteTarget& other) const
54 {
55 return tile_num < other.tile_num;
56 }
57
58 WriteTarget() { }
59 WriteTarget(const WriteTarget &t, uint32_t sz) : tile_num(t.tile_num), size(sz), data(t.data) { }
60
61 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
62 uint32_t size; ///< Size to write
63 std::shared_ptr<char> data; ///< Memory block to write
64 };
65
66
67 /// Parameters required to compress a tile of data
68 struct CompressionTarget
69 {
70 CompressionTarget(CatalogRow& r) : catalog_entry(r)
71 {}
72
73 CatalogRow& catalog_entry; ///< Reference to the catalog entry to deal with
74 std::shared_ptr<char> src; ///< Original data
75 std::shared_ptr<char> transposed_src; ///< Transposed data
76 WriteTarget target; ///< Compressed data
77 uint32_t num_rows; ///< Number of rows to compress
78 };
79
80public:
81 /// static setter for the default number of threads to use. -1 means all available physical cores
82 static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)>=-1) n=_n; return n; }
83 static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; }
84 static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; }
85 static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; }
86
87 /// constructors
88 /// @param numTiles how many data groups should be pre-reserved ?
89 /// @param rowPerTile how many rows will be grouped together in a single tile
90 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
91 zofits(uint32_t numTiles = DefaultMaxNumTiles(),
92 uint32_t rowPerTile = DefaultNumRowsPerTile(),
93 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
94 fMemPool(0, size_t(maxUsableMem)*1000),
95 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
96 {
97 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
98 SetNumThreads(DefaultNumThreads());
99 }
100
101 /// @param fname the target filename
102 /// @param numTiles how many data groups should be pre-reserved ?
103 /// @param rowPerTile how many rows will be grouped together in a single tile
104 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
105 zofits(const char* fname,
106 uint32_t numTiles = DefaultMaxNumTiles(),
107 uint32_t rowPerTile = DefaultNumRowsPerTile(),
108 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
109 fMemPool(0, maxUsableMem*1000),
110 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
111 {
112 open(fname);
113 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
114 SetNumThreads(DefaultNumThreads());
115 }
116
117 zofits(const std::string &fname,
118 uint32_t numTiles = DefaultMaxNumTiles(),
119 uint32_t rowPerTile = DefaultNumRowsPerTile(),
120 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
121 fMemPool(0, maxUsableMem*1000),
122 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
123 {
124 open(fname);
125 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
126 SetNumThreads(DefaultNumThreads());
127 }
128
129 //initialization of member variables
130 /// @param nt number of tiles
131 /// @param rpt number of rows per tile
132 /// @param maxUsableMem max amount of RAM to be used by the compression buffers
133 void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
134 {
135 fCheckOffset = 0;
136 fNumQueues = 0;
137
138 fNumTiles = nt==0 ? 1 : nt;
139 fNumRowsPerTile = rpt;
140
141 fRealRowWidth = 0;
142 fCatalogOffset = 0;
143 fCatalogSize = 0;
144
145 fMaxUsableMem = maxUsableMem;
146#ifdef __EXCEPTIONS
147 fThreadsException = std::exception_ptr();
148#endif
149 fErrno = 0;
150 }
151
152 /// write the header of the binary table
153 /// @param name the name of the table to be created
154 /// @return the state of the file
155 virtual bool WriteTableHeader(const char* name="DATA")
156 {
157 reallocateBuffers();
158
159 SetInt("ZNAXIS1", fRealRowWidth);
160
161 ofits::WriteTableHeader(name);
162
163 fCompressionQueues.front().setPromptExecution(fNumQueues==0);
164 fWriteToDiskQueue.setPromptExecution(fNumQueues==0);
165
166 if (fNumQueues != 0)
167 {
168 //start the compression queues
169 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
170 it->start();
171
172 //start the disk writer
173 fWriteToDiskQueue.start();
174 }
175
176 //mark that no tile has been written so far
177 fLatestWrittenTile = -1;
178
179 //no wiring error (in the writing of the data) has occured so far
180 fErrno = 0;
181
182 return good();
183 }
184
185 /// open a new file.
186 /// @param filename the name of the file
187 /// @param Whether or not the name of the extension should be added or not
188 void open(const char* filename, bool addEXTNAMEKey=true)
189 {
190 ofits::open(filename, addEXTNAMEKey);
191
192 //add compression-related header entries
193 SetBool( "ZTABLE", true, "Table is compressed");
194 SetInt( "ZNAXIS1", 0, "Width of uncompressed rows");
195 SetInt( "ZNAXIS2", 0, "Number of uncompressed rows");
196 SetInt( "ZPCOUNT", 0, "");
197 SetInt( "ZHEAPPTR", 0, "");
198 SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
199 SetInt( "THEAP", 0, "");
200 SetStr( "RAWSUM", " 0", "Checksum of raw little endian data");
201 SetFloat("ZRATIO", 0, "Compression ratio");
202 SetInt( "ZSHRINK", 1, "Catalog shrink factor");
203
204 fCatalogSize = 0;
205 fRealRowWidth = 0;
206 fCatalogOffset = 0;
207 fCatalogSize = 0;
208 fCheckOffset = 0;
209
210 fRealColumns.clear();
211 fCatalog.clear();
212 fCatalogSum.reset();
213 fRawSum.reset();
214 }
215
216 void open(const std::string &filename, bool addEXTNAMEKey=true)
217 {
218 open(filename.c_str(), addEXTNAMEKey);
219 }
220
221 /// Super method. does nothing as zofits does not know about DrsOffsets
222 /// @return the state of the file
223 virtual bool WriteDrsOffsetsTable()
224 {
225 return good();
226 }
227
228 /// Returns the number of bytes per uncompressed row
229 /// @return number of bytes per uncompressed row
230 uint32_t GetBytesPerRow() const
231 {
232 return fRealRowWidth;
233 }
234
235 /// Write the data catalog
236 /// @return the state of the file
237 bool WriteCatalog()
238 {
239 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
240 const uint32_t total_catalog_size = fNumTiles*one_catalog_row_size;
241
242 // swap the catalog bytes before writing
243 std::vector<char> swapped_catalog(total_catalog_size);
244
245 uint32_t shift = 0;
246 for (auto it=fCatalog.cbegin(); it!=fCatalog.cend(); it++)
247 {
248 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
249 shift += one_catalog_row_size;
250 }
251
252 if (fCatalogSize < fNumTiles)
253 memset(swapped_catalog.data()+shift, 0, total_catalog_size-shift);
254
255 // first time writing ? remember where we are
256 if (fCatalogOffset == 0)
257 fCatalogOffset = tellp();
258
259 // remember where we came from
260 const off_t where_are_we = tellp();
261
262 // write to disk
263 seekp(fCatalogOffset);
264 write(swapped_catalog.data(), total_catalog_size);
265
266 if (where_are_we != fCatalogOffset)
267 seekp(where_are_we);
268
269 // udpate checksum
270 fCatalogSum.reset();
271 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
272
273 return good();
274 }
275
276 /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets.
277 virtual void DrsOffsetCalibrate(char* )
278 {
279
280 }
281
282 CatalogRow& AddOneCatalogRow()
283 {
284 // add one row to the catalog
285 fCatalog.emplace_back(CatalogRow());
286 fCatalog.back().resize(fTable.num_cols);
287 for (auto it=fCatalog.back().begin(); it != fCatalog.back().end(); it++)
288 *it = CatalogEntry(0,0);
289
290 fCatalogSize++;
291
292 return fCatalog.back();
293 }
294
295 /// write one row of data
296 /// Note, in a multi-threaded environment (NumThreads>0), the return code should be checked rather
297 /// than the badbit() of the stream (it might have been set by a thread before the errno has been set)
298 /// errno will then contain the correct error number of the last error which happened during writing.
299 /// @param ptr the source buffer
300 /// @param the number of bytes to write
301 /// @return the state of the file. WARNING: with multithreading, this will most likely be the state of the file before the data is actually written
302 bool WriteRow(const void* ptr, size_t cnt, bool = true)
303 {
304 if (cnt != fRealRowWidth)
305 {
306#ifdef __EXCEPTIONS
307 throw std::runtime_error("Wrong size of row given to WriteRow");
308#else
309 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << std::endl;
310 return false;
311#endif
312 }
313
314#ifdef __EXCEPTIONS
315 //check if something hapenned while the compression threads were working
316 //if so, re-throw the exception that was generated
317 if (fThreadsException != std::exception_ptr())
318 std::rethrow_exception(fThreadsException);
319#endif
320
321 //copy current row to pool or rows waiting for compression
322 char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
323 memcpy(target_location, ptr, fRealRowWidth);
324
325 //for now, make an extra copy of the data, for RAWSUM checksuming.
326 //Ideally this should be moved to the threads
327 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
328 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
329 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
330 char* buffer = fRawSumBuffer.data() + rawOffset;
331 auto ib = fRawSumBuffer.begin();
332 auto ie = fRawSumBuffer.rbegin();
333 *ib++ = 0;
334 *ib++ = 0;
335 *ib++ = 0;
336 *ib = 0;
337
338 *ie++ = 0;
339 *ie++ = 0;
340 *ie++ = 0;
341 *ie = 0;
342
343 memcpy(buffer, ptr, fRealRowWidth);
344
345 fRawSum.add(fRawSumBuffer, false);
346
347 fTable.num_rows++;
348
349 if (fTable.num_rows % fNumRowsPerTile != 0)
350 {
351 errno = fErrno;
352 return errno==0;
353 }
354
355 // use the least occupied queue
356 const auto imin = std::min_element(fCompressionQueues.begin(), fCompressionQueues.end());
357
358 if (!imin->emplace(InitNextCompression()))
359 {
360#ifdef __EXCEPTIONS
361 throw std::runtime_error("The compression queues are not started. Did you close the file before writing this row?");
362#else
363 gLog << ___err___ << "The compression queues are not started. Did you close the file before writing this row?" << std::endl;
364 errno = 0;
365 return false;
366#endif
367 }
368
369 errno = fErrno;
370 return errno==0;
371 }
372
373 /// update the real number of rows
374 void FlushNumRows()
375 {
376 SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile);
377 SetInt("ZNAXIS2", fTable.num_rows);
378 FlushHeader();
379 }
380
381 /// Setup the environment to compress yet another tile of data
382 /// @param target the struct where to host the produced parameters
383 CompressionTarget InitNextCompression()
384 {
385 CompressionTarget target(AddOneCatalogRow());
386
387 //fill up compression target
388 target.src = fSmartBuffer;
389 target.transposed_src = fMemPool.malloc();
390 target.num_rows = fTable.num_rows;
391
392 //fill up write to disk target
393 WriteTarget &write_target = target.target;
394 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
395 write_target.size = 0;
396 write_target.data = fMemPool.malloc();
397
398 //get a new buffer to host the incoming data
399 fSmartBuffer = fMemPool.malloc();
400
401 return target;
402 }
403
404 /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file.
405 uint32_t ShrinkCatalog()
406 {
407 //add empty row to get either the target number of rows, or a multiple of the allowed size
408 for (uint32_t i=0;i<fCatalogSize%fNumTiles;i++)
409 AddOneCatalogRow();
410
411 //did we write more rows than what the catalog could host ?
412 if (fCatalogSize <= fNumTiles) // nothing to do
413 return 1;
414
415 //always exact as extra rows were added just above
416 const uint32_t shrink_factor = fCatalogSize / fNumTiles;
417
418 //shrink the catalog !
419 uint32_t entry_id = 1;
420 auto it = fCatalog.begin();
421 it++;
422 for (; it != fCatalog.end(); it++)
423 {
424 if (entry_id >= fNumTiles)
425 break;
426
427 const uint32_t target_id = entry_id*shrink_factor;
428
429 auto jt = it;
430 for (uint32_t i=0; i<target_id-entry_id; i++)
431 jt++;
432
433 *it = *jt;
434
435 entry_id++;
436 }
437
438 const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
439
440 //remove the too many entries
441 for (uint32_t i=0;i<num_tiles_to_remove;i++)
442 {
443 fCatalog.pop_back();
444 fCatalogSize--;
445 }
446
447 //update header keywords
448 fNumRowsPerTile *= shrink_factor;
449
450 SetInt("ZTILELEN", fNumRowsPerTile);
451 SetInt("ZSHRINK", shrink_factor);
452
453 return shrink_factor;
454 }
455
456 /// close an open file.
457 /// @return the state of the file
458 bool close()
459 {
460 // stop compression and write threads
461 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
462 it->wait();
463
464 fWriteToDiskQueue.wait();
465
466 if (tellp() < 0)
467 return false;
468
469#ifdef __EXCEPTIONS
470 //check if something hapenned while the compression threads were working
471 //if so, re-throw the exception that was generated
472 if (fThreadsException != std::exception_ptr())
473 std::rethrow_exception(fThreadsException);
474#endif
475
476 //write the last tile of data (if any)
477 if (fErrno==0 && fTable.num_rows%fNumRowsPerTile!=0)
478 {
479 fWriteToDiskQueue.enablePromptExecution();
480 fCompressionQueues.front().enablePromptExecution();
481 fCompressionQueues.front().emplace(InitNextCompression());
482 }
483
484 AlignTo2880Bytes();
485
486 int64_t heap_size = 0;
487 int64_t compressed_offset = 0;
488 for (auto it=fCatalog.begin(); it!= fCatalog.end(); it++)
489 {
490 compressed_offset += sizeof(FITS::TileHeader);
491 heap_size += sizeof(FITS::TileHeader);
492 for (uint32_t j=0; j<it->size(); j++)
493 {
494 heap_size += (*it)[j].first;
495 (*it)[j].second = compressed_offset;
496 compressed_offset += (*it)[j].first;
497 if ((*it)[j].first == 0)
498 (*it)[j].second = 0;
499 }
500 }
501
502 const uint32_t shrink_factor = ShrinkCatalog();
503
504 //update header keywords
505 SetInt("ZNAXIS1", fRealRowWidth);
506 SetInt("ZNAXIS2", fTable.num_rows);
507
508 SetInt("ZHEAPPTR", fCatalogSize*fTable.num_cols*sizeof(uint64_t)*2);
509
510 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
511 const uint32_t total_catalog_width = 2*sizeof(int64_t)*fTable.num_cols;
512
513 SetInt("THEAP", total_num_tiles_written*total_catalog_width);
514 SetInt("NAXIS1", total_catalog_width);
515 SetInt("NAXIS2", total_num_tiles_written);
516 SetStr("RAWSUM", std::to_string((long long int)(fRawSum.val())));
517
518 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
519 SetFloat("ZRATIO", compression_ratio);
520
521 //add to the heap size the size of the gap between the catalog and the actual heap
522 heap_size += (fCatalogSize - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
523
524 SetInt("PCOUNT", heap_size, "size of special data area");
525
526 //Just for updating the fCatalogSum value
527 WriteCatalog();
528
529 fDataSum += fCatalogSum;
530
531 const Checksum checksm = UpdateHeaderChecksum();
532
533 if (!fFilebuf.close())
534 setstate(ios_base::failbit);
535
536 fSmartBuffer = std::shared_ptr<char>();
537
538 //restore the number of rows per tile in case the catalog has been shrinked
539 if (shrink_factor != 1)
540 fNumRowsPerTile /= shrink_factor;
541
542 if ((checksm+fDataSum).valid())
543 return true;
544
545 std::ostringstream sout;
546 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
547#ifdef __EXCEPTIONS
548 throw std::runtime_error(sout.str());
549#else
550 gLog << ___err___ << "ERROR - " << sout.str() << std::endl;
551 return false;
552#endif
553 }
554
555 /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
556 bool AddColumn(uint32_t cnt, char typechar, const std::string& name, const std::string& unit,
557 const std::string& comment="", bool addHeaderKeys=true)
558 {
559 return AddColumn(FITS::kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
560 }
561
562 /// Overload of the simplified compressed version
563 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const std::string& name,
564 const std::string& unit, const std::string& comment="", bool addHeaderKeys=true)
565 {
566 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
567 return false;
568
569 const size_t size = SizeFromType(typechar);
570
571 Table::Column col;
572 col.name = name;
573 col.type = typechar;
574 col.num = cnt;
575 col.size = size;
576 col.offset = fRealRowWidth;
577
578 fRealRowWidth += size*cnt;
579
580 fRealColumns.emplace_back(col, comp);
581
582 SetStr("ZFORM"+std::to_string((long long int)(fRealColumns.size())), std::to_string((long long int)(cnt))+typechar, "format of "+name+" "+CommentFromType(typechar));
583 SetStr("ZCTYP"+std::to_string((long long int)(fRealColumns.size())), "FACT", "Compression type: FACT");
584
585 return true;
586 }
587
588 /// Get and set the actual number of threads for this object
589 int32_t GetNumThreads() const { return fNumQueues; }
590 bool SetNumThreads(uint32_t num)
591 {
592 if (tellp()>0)
593 {
594#ifdef __EXCEPTIONS
595 throw std::runtime_error("Number of threads cannot be changed in the middle of writing a file");
596#else
597 gLog << ___err___ << "ERROR - Number of threads cannot be changed in the middle of writing a file" << std::endl;
598#endif
599 return false;
600 }
601
602 //get number of physically available threads
603#ifdef HAVE_BOOST_THREAD
604 unsigned int num_available_cores = boost::thread::hardware_concurrency();
605#else
606 unsigned int num_available_cores = std::thread::hardware_concurrency();
607#endif
608 // could not detect number of available cores from system properties...
609 if (num_available_cores == 0)
610 num_available_cores = 1;
611
612 // leave one core for the main thread and one for the writing
613 if (num > num_available_cores)
614 num = num_available_cores>2 ? num_available_cores-2 : 1;
615
616 fCompressionQueues.resize(num<1?1:num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false));
617 fNumQueues = num;
618
619 return true;
620 }
621
622 uint32_t GetNumTiles() const { return fNumTiles; }
623 void SetNumTiles(uint32_t num) { fNumTiles=num; }
624
625protected:
626
627 /// Allocates the required objects.
628 void reallocateBuffers()
629 {
630 const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(FITS::BlockHeader) + sizeof(FITS::TileHeader) + 8; //+8 for checksuming;
631 fMemPool.setChunkSize(chunk_size);
632
633 fSmartBuffer = fMemPool.malloc();
634 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
635 }
636
637 /// Actually does the writing to disk (and checksuming)
638 /// @param src the buffer to write
639 /// @param sizeToWrite how many bytes should be written
640 /// @return the state of the file
641 bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite)
642 {
643 char* checkSumPointer = src+4;
644 int32_t extraBytes = 0;
645 uint32_t sizeToChecksum = sizeToWrite;
646
647 //should we extend the array to the left ?
648 if (fCheckOffset != 0)
649 {
650 sizeToChecksum += fCheckOffset;
651 checkSumPointer -= fCheckOffset;
652 memset(checkSumPointer, 0, fCheckOffset);
653 }
654
655 //should we extend the array to the right ?
656 if (sizeToChecksum%4 != 0)
657 {
658 extraBytes = 4 - (sizeToChecksum%4);
659 memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
660 sizeToChecksum += extraBytes;
661 }
662
663 //do the checksum
664 fDataSum.add(checkSumPointer, sizeToChecksum);
665
666 fCheckOffset = (4 - extraBytes)%4;
667
668 //write data to disk
669 write(src+4, sizeToWrite);
670
671 return good();
672 }
673
674 /// Compress a given buffer based on the target. This is the method executed by the threads
675 /// @param target the struct hosting the parameters of the compression
676 /// @return number of bytes of the compressed data, or always 1 when used by the Queues
677 bool CompressBuffer(const CompressionTarget& target)
678 {
679 //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
680 //Really do not understand what's wrong...
681 //calibrate data if required
682 const uint32_t thisRoundNumRows = (target.num_rows%fNumRowsPerTile) ? target.num_rows%fNumRowsPerTile : fNumRowsPerTile;
683 for (uint32_t i=0;i<thisRoundNumRows;i++)
684 {
685 char* target_location = target.src.get() + fRealRowWidth*i;
686 DrsOffsetCalibrate(target_location);
687 }
688#ifdef __EXCEPTIONS
689 try
690 {
691#endif
692 //transpose the original data
693 copyTransposeTile(target.src.get(), target.transposed_src.get(), target.num_rows);
694
695 //compress the buffer
696 const uint64_t compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
697
698 //post the result to the writing queue
699 //get a copy so that it becomes non-const
700 fWriteToDiskQueue.emplace(target.target, compressed_size);
701
702#ifdef __EXCEPTIONS
703 }
704 catch (...)
705 {
706 fThreadsException = std::current_exception();
707 if (fNumQueues == 0)
708 std::rethrow_exception(fThreadsException);
709 }
710#endif
711
712 return true;
713 }
714
715 /// Write one compressed tile to disk. This is the method executed by the writing thread
716 /// @param target the struct hosting the write parameters
717 bool WriteBufferToDisk(const WriteTarget& target)
718 {
719 //is this the tile we're supposed to write ?
720 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
721 return false;
722
723 fLatestWrittenTile++;
724
725#ifdef __EXCEPTIONS
726 try
727 {
728#endif
729 //could not write the data to disk
730 if (!writeCompressedDataToDisk(target.data.get(), target.size))
731 fErrno = errno;
732#ifdef __EXCEPTIONS
733 }
734 catch (...)
735 {
736 fThreadsException = std::current_exception();
737 if (fNumQueues == 0)
738 std::rethrow_exception(fThreadsException);
739 }
740#endif
741 return true;
742 }
743
744 /// Compress a given buffer based on its source and destination
745 //src cannot be const, as applySMOOTHING is done in place
746 /// @param dest the buffer hosting the compressed data
747 /// @param src the buffer hosting the transposed data
748 /// @param num_rows the number of uncompressed rows in the transposed buffer
749 /// @param the number of bytes of the compressed data
750 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows, CatalogRow& catalog_row)
751 {
752 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
753 uint32_t offset = 0;
754
755 //skip the checksum reserved area
756 dest += 4;
757
758 //skip the 'TILE' marker and tile size entry
759 uint64_t compressedOffset = sizeof(FITS::TileHeader);
760
761 //now compress each column one by one by calling compression on arrays
762 for (uint32_t i=0;i<fRealColumns.size();i++)
763 {
764 catalog_row[i].second = compressedOffset;
765
766 if (fRealColumns[i].col.num == 0)
767 continue;
768
769 FITS::Compression& head = fRealColumns[i].block_head;
770
771 //set the default byte telling if uncompressed the compressed Flag
772 const uint64_t previousOffset = compressedOffset;
773
774 //skip header data
775 compressedOffset += head.getSizeOnDisk();
776
777 for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
778 {
779 switch (head.getProc(j))
780 {
781 case FITS::kFactRaw:
782 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
783 break;
784
785 case FITS::kFactSmoothing:
786 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
787 break;
788
789 case FITS::kFactHuffman16:
790 if (head.getOrdering() == FITS::kOrderByCol)
791 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
792 else
793 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
794 break;
795 }
796 }
797
798 //check if compressed size is larger than uncompressed
799 //if so set flag and redo it uncompressed
800 if ((head.getProc(0) != FITS::kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
801 {
802 //de-smooth !
803 if (head.getProc(0) == FITS::kFactSmoothing)
804 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
805
806 FITS::Compression he;
807
808 compressedOffset = previousOffset + he.getSizeOnDisk();
809 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
810
811 he.SetBlockSize(compressedOffset - previousOffset);
812 he.Memcpy(dest+previousOffset);
813
814 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
815
816 catalog_row[i].first = compressedOffset - catalog_row[i].second;
817 continue;
818 }
819
820 head.SetBlockSize(compressedOffset - previousOffset);
821 head.Memcpy(dest + previousOffset);
822
823 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
824 catalog_row[i].first = compressedOffset - catalog_row[i].second;
825 }
826
827 const FITS::TileHeader tile_head(thisRoundNumRows, compressedOffset);
828 memcpy(dest, &tile_head, sizeof(FITS::TileHeader));
829
830 return compressedOffset;
831 }
832
833 /// Transpose a tile to a new buffer
834 /// @param src buffer hosting the regular, row-ordered data
835 /// @param dest the target buffer that will receive the transposed data
836 void copyTransposeTile(const char* src, char* dest, uint32_t num_rows)
837 {
838 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
839
840 //copy the tile and transpose it
841 for (uint32_t i=0;i<fRealColumns.size();i++)
842 {
843 switch (fRealColumns[i].block_head.getOrdering())
844 {
845 case FITS::kOrderByRow:
846 //regular, "semi-transposed" copy
847 for (uint32_t k=0;k<thisRoundNumRows;k++)
848 {
849 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
850 dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
851 }
852 break;
853
854 case FITS::kOrderByCol:
855 //transposed copy
856 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
857 for (uint32_t k=0;k<thisRoundNumRows;k++)
858 {
859 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
860 dest += fRealColumns[i].col.size;
861 }
862 break;
863 };
864 }
865 }
866
867 /// Specific compression functions
868 /// @param dest the target buffer
869 /// @param src the source buffer
870 /// @param size number of bytes to copy
871 /// @return number of bytes written
872 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
873 {
874 memcpy(dest, src, size);
875 return size;
876 }
877
878 /// Do huffman encoding
879 /// @param dest the buffer that will receive the compressed data
880 /// @param src the buffer hosting the transposed data
881 /// @param numRows number of rows of data in the transposed buffer
882 /// @param sizeOfElems size in bytes of one data elements
883 /// @param numRowElems number of elements on each row
884 /// @return number of bytes written
885 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
886 {
887 std::string huffmanOutput;
888 uint32_t previousHuffmanSize = 0;
889
890 //if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
891 if (numRows < 2)
892 return numRows*sizeOfElems*numRowElems + 1000;
893
894 if (sizeOfElems < 2 )
895 {
896#ifdef __EXCEPTIONS
897 throw std::runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
898#else
899 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << std::endl;
900 return 0;
901#endif
902 }
903
904 uint32_t huffmanOffset = 0;
905 for (uint32_t j=0;j<numRowElems;j++)
906 {
907 Huffman::Encode(huffmanOutput,
908 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
909 numRows*(sizeOfElems/2));
910 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
911 huffmanOffset += sizeof(uint32_t);
912 previousHuffmanSize = huffmanOutput.size();
913 }
914
915 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
916
917 //only copy if not larger than not-compressed size
918 if (totalSize < numRows*sizeOfElems*numRowElems)
919 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
920
921 return totalSize;
922 }
923
924 /// Applies Thomas' DRS4 smoothing
925 /// @param data where to apply it
926 /// @param numElems how many elements of type int16_t are stored in the buffer
927 /// @return number of bytes modified
928 uint32_t applySMOOTHING(char* data, uint32_t numElems)
929 {
930 int16_t* short_data = reinterpret_cast<int16_t*>(data);
931 for (int j=numElems-1;j>1;j--)
932 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
933
934 return numElems*sizeof(int16_t);
935 }
936
937 /// Apply the inverse transform of the integer smoothing
938 /// @param data where to apply it
939 /// @param numElems how many elements of type int16_t are stored in the buffer
940 /// @return number of bytes modified
941 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
942 {
943 int16_t* short_data = reinterpret_cast<int16_t*>(data);
944 for (uint32_t j=2;j<numElems;j++)
945 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
946
947 return numElems*sizeof(uint16_t);
948 }
949
950
951
952 //thread related stuff
953 MemoryManager fMemPool; ///< Actual memory manager, providing memory for the compression buffers
954 int32_t fNumQueues; ///< Current number of threads that will be used by this object
955 uint64_t fMaxUsableMem; ///< Maximum number of bytes that can be allocated by the memory manager
956 int32_t fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads)
957
958 std::vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads)
959 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///< Writing queue (=thread)
960
961 // catalog related stuff
962 CatalogType fCatalog; ///< Catalog for this file
963 uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists)
964 uint32_t fNumTiles; ///< Number of pre-reserved tiles
965 uint32_t fNumRowsPerTile; ///< Number of rows per tile
966 off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file
967
968 // checksum related stuff
969 Checksum fCatalogSum; ///< Checksum of the catalog
970 Checksum fRawSum; ///< Raw sum (specific to FACT)
971 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
972
973 // data layout related stuff
974 /// Regular columns augmented with compression informations
975 struct CompressedColumn
976 {
977 CompressedColumn(const Table::Column& c, const FITS::Compression& h) : col(c),
978 block_head(h)
979 {}
980#ifdef __MARS__ // Needed for the compilation ofthe dictionary
981 CompressedColumn() { }
982#endif
983 Table::Column col; ///< the regular column entry
984 FITS::Compression block_head; ///< the compression data associated with that column
985 };
986 std::vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file
987 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row
988 std::shared_ptr<char> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written
989 std::vector<char> fRawSumBuffer; ///< buffer used for checksuming the incoming data, before compression
990
991#ifdef __EXCEPTIONS
992 std::exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads
993#endif
994 int fErrno; ///< propagate errno to main thread
995};
996
997#endif
Note: See TracBrowser for help on using the repository browser.