source: branches/MarsMoreSimulationTruth/mcore/zofits.h@ 19364

Last change on this file since 19364 was 18594, checked in by tbretz, 10 years ago
It seems typedefs and templates need to be public with the newer compiler to work with the root dictionary (5.34/36) or it is the root version (uncritical)
File size: 40.1 KB
Line 
1#ifndef FACT_zofits
2#define FACT_zofits
3
4/*
5 * zofits.h
6 *
7 * FACT native compressed FITS writer
8 * Author: lyard
9 */
10#include "ofits.h"
11#include "huffman.h"
12#include "Queue.h"
13#include "MemoryManager.h"
14
15#ifdef HAVE_BOOST_THREAD
16#include <boost/thread.hpp>
17#else
18#include <unistd.h>
19#endif
20
21#if defined(__CINT__) || !defined(__EXCEPTIONS)
22namespace std
23{
24 typedef void* exception_ptr;
25};
26#endif
27
28class zofits : public ofits
29{
30#ifdef __MARS__ // Needed by CINT to access the structures
31public:
32#endif
33 /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin
34 template<class S>
35 struct QueueMin : std::list<S>
36 {
37 typename std::list<S>::iterator begin()
38 {
39 return min_element(std::list<S>::begin(), std::list<S>::end());
40 }
41 };
42
43#ifdef __CINT__
44 // CINT doesn't like the packed attribute...
45 // Therefore we give another hint of the size of the structure
46 struct CatalogEntry { char dummy[16]; };
47#else
48 //catalog types
49 struct CatalogEntry
50 {
51 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) { }
52 int64_t first; ///< Size of this column in the tile
53 int64_t second; ///< offset of this column in the tile, from the start of the heap area
54 } __attribute__((__packed__));
55#endif
56
57 typedef std::vector<CatalogEntry> CatalogRow;
58 typedef std::list<CatalogRow> CatalogType;
59
60 /// Parameters required to write a tile to disk
61 struct WriteTarget
62 {
63 bool operator < (const WriteTarget& other) const
64 {
65 return tile_num < other.tile_num;
66 }
67
68 WriteTarget() { }
69 WriteTarget(const WriteTarget &t, uint32_t sz) : tile_num(t.tile_num), size(sz), data(t.data) { }
70
71 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
72 uint32_t size; ///< Size to write
73 std::shared_ptr<char> data; ///< Memory block to write
74 };
75
76
77 /// Parameters required to compress a tile of data
78 struct CompressionTarget
79 {
80 CompressionTarget(CatalogRow& r) : catalog_entry(r)
81 {}
82
83 CatalogRow& catalog_entry; ///< Reference to the catalog entry to deal with
84 std::shared_ptr<char> src; ///< Original data
85 std::shared_ptr<char> transposed_src; ///< Transposed data
86 WriteTarget target; ///< Compressed data
87 uint32_t num_rows; ///< Number of rows to compress
88 };
89
90public:
91 /// static setter for the default number of threads to use. -1 means all available physical cores
92 static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)>-2) n=_n; return n; }
93 static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; }
94 static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; }
95 static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; }
96
97 /// constructors
98 /// @param numTiles how many data groups should be pre-reserved ?
99 /// @param rowPerTile how many rows will be grouped together in a single tile
100 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
101 zofits(uint32_t numTiles = DefaultMaxNumTiles(),
102 uint32_t rowPerTile = DefaultNumRowsPerTile(),
103 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
104 fMemPool(0, size_t(maxUsableMem)*1000),
105 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
106 {
107 InitMemberVariables(numTiles, rowPerTile, size_t(maxUsableMem)*1000);
108 SetNumThreads(DefaultNumThreads());
109 }
110
111 /// @param fname the target filename
112 /// @param numTiles how many data groups should be pre-reserved ?
113 /// @param rowPerTile how many rows will be grouped together in a single tile
114 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
115 zofits(const char* fname,
116 uint32_t numTiles = DefaultMaxNumTiles(),
117 uint32_t rowPerTile = DefaultNumRowsPerTile(),
118 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
119 fMemPool(0, size_t(maxUsableMem)*1000),
120 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
121 {
122 InitMemberVariables(numTiles, rowPerTile, size_t(maxUsableMem)*1000);
123 SetNumThreads(DefaultNumThreads());
124 open(fname);
125 }
126
127 zofits(const std::string &fname,
128 uint32_t numTiles = DefaultMaxNumTiles(),
129 uint32_t rowPerTile = DefaultNumRowsPerTile(),
130 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
131 fMemPool(0, size_t(maxUsableMem)*1000),
132 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
133 {
134 InitMemberVariables(numTiles, rowPerTile, size_t(maxUsableMem)*1000);
135 SetNumThreads(DefaultNumThreads());
136 open(fname);
137 }
138
139 //initialization of member variables
140 /// @param nt number of tiles
141 /// @param rpt number of rows per tile
142 /// @param maxUsableMem max amount of RAM to be used by the compression buffers
143 void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
144 {
145 fCheckOffset = 0;
146 fNumQueues = 0;
147
148 fNumTiles = nt==0 ? 1 : nt;
149 fNumRowsPerTile = rpt;
150
151 fRealRowWidth = 0;
152 fCatalogOffset = 0;
153 fCatalogSize = 0;
154
155 fMaxUsableMem = maxUsableMem;
156#ifdef __EXCEPTIONS
157 fThreadsException = std::exception_ptr();
158#endif
159 fErrno = 0;
160 }
161
162 /// write the header of the binary table
163 /// @param name the name of the table to be created
164 /// @return the state of the file
165 virtual bool WriteTableHeader(const char* name="DATA")
166 {
167 reallocateBuffers();
168
169 SetInt("ZNAXIS1", fRealRowWidth);
170
171 ofits::WriteTableHeader(name);
172
173 fCompressionQueues.front().setPromptExecution(fNumQueues==0);
174 fWriteToDiskQueue.setPromptExecution(fNumQueues==0);
175
176 if (fNumQueues != 0)
177 {
178 //start the compression queues
179 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
180 it->start();
181
182 //start the disk writer
183 fWriteToDiskQueue.start();
184 }
185
186 //mark that no tile has been written so far
187 fLatestWrittenTile = -1;
188
189 //no wiring error (in the writing of the data) has occured so far
190 fErrno = 0;
191
192 return good();
193 }
194
195 /// open a new file.
196 /// @param filename the name of the file
197 /// @param Whether or not the name of the extension should be added or not
198 void open(const char* filename, bool addEXTNAMEKey=true)
199 {
200 ofits::open(filename, addEXTNAMEKey);
201
202 //add compression-related header entries
203 SetBool( "ZTABLE", true, "Table is compressed");
204 SetInt( "ZNAXIS1", 0, "Width of uncompressed rows");
205 SetInt( "ZNAXIS2", 0, "Number of uncompressed rows");
206 SetInt( "ZPCOUNT", 0, "");
207 SetInt( "ZHEAPPTR", 0, "");
208 SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
209 SetInt( "THEAP", 0, "");
210 SetStr( "RAWSUM", " 0", "Checksum of raw little endian data");
211 SetFloat("ZRATIO", 0, "Compression ratio");
212 SetInt( "ZSHRINK", 1, "Catalog shrink factor");
213
214 fCatalogSize = 0;
215 fRealRowWidth = 0;
216 fCatalogOffset = 0;
217 fCatalogSize = 0;
218 fCheckOffset = 0;
219
220 fRealColumns.clear();
221 fCatalog.clear();
222 fCatalogSum.reset();
223 fRawSum.reset();
224 }
225
226 void open(const std::string &filename, bool addEXTNAMEKey=true)
227 {
228 open(filename.c_str(), addEXTNAMEKey);
229 }
230
231 /// Super method. does nothing as zofits does not know about DrsOffsets
232 /// @return the state of the file
233 virtual bool WriteDrsOffsetsTable()
234 {
235 return good();
236 }
237
238 /// Returns the number of bytes per uncompressed row
239 /// @return number of bytes per uncompressed row
240 uint32_t GetBytesPerRow() const
241 {
242 return fRealRowWidth;
243 }
244
245 /// Write the data catalog
246 /// @return the state of the file
247 bool WriteCatalog()
248 {
249 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
250 const uint32_t total_catalog_size = fNumTiles*one_catalog_row_size;
251
252 // swap the catalog bytes before writing
253 std::vector<char> swapped_catalog(total_catalog_size);
254
255 uint32_t shift = 0;
256 for (auto it=fCatalog.cbegin(); it!=fCatalog.cend(); it++)
257 {
258 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
259 shift += one_catalog_row_size;
260 }
261
262 if (fCatalogSize < fNumTiles)
263 memset(swapped_catalog.data()+shift, 0, total_catalog_size-shift);
264
265 // first time writing ? remember where we are
266 if (fCatalogOffset == 0)
267 fCatalogOffset = tellp();
268
269 // remember where we came from
270 const off_t where_are_we = tellp();
271
272 // write to disk
273 seekp(fCatalogOffset);
274 write(swapped_catalog.data(), total_catalog_size);
275
276 if (where_are_we != fCatalogOffset)
277 seekp(where_are_we);
278
279 // udpate checksum
280 fCatalogSum.reset();
281 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
282
283 return good();
284 }
285
286 /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets.
287 virtual void DrsOffsetCalibrate(char* )
288 {
289
290 }
291
292 CatalogRow& AddOneCatalogRow()
293 {
294 // add one row to the catalog
295 fCatalog.emplace_back(CatalogRow());
296 fCatalog.back().resize(fTable.num_cols);
297 for (auto it=fCatalog.back().begin(); it != fCatalog.back().end(); it++)
298 *it = CatalogEntry(0,0);
299
300 fCatalogSize++;
301
302 return fCatalog.back();
303 }
304
305 /// write one row of data
306 /// Note, in a multi-threaded environment (NumThreads>0), the return code should be checked rather
307 /// than the badbit() of the stream (it might have been set by a thread before the errno has been set)
308 /// errno will then contain the correct error number of the last error which happened during writing.
309 /// @param ptr the source buffer
310 /// @param the number of bytes to write
311 /// @return the state of the file. WARNING: with multithreading, this will most likely be the state of the file before the data is actually written
312 bool WriteRow(const void* ptr, size_t cnt, bool = true)
313 {
314 if (cnt != fRealRowWidth)
315 {
316#ifdef __EXCEPTIONS
317 throw std::runtime_error("Wrong size of row given to WriteRow");
318#else
319 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << std::endl;
320 return false;
321#endif
322 }
323
324#ifdef __EXCEPTIONS
325 //check if something hapenned while the compression threads were working
326 //if so, re-throw the exception that was generated
327 if (fThreadsException != std::exception_ptr())
328 std::rethrow_exception(fThreadsException);
329#endif
330
331 //copy current row to pool or rows waiting for compression
332 char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
333 memcpy(target_location, ptr, fRealRowWidth);
334
335 //for now, make an extra copy of the data, for RAWSUM checksuming.
336 //Ideally this should be moved to the threads
337 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
338 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
339 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
340 char* buffer = fRawSumBuffer.data() + rawOffset;
341 auto ib = fRawSumBuffer.begin();
342 auto ie = fRawSumBuffer.rbegin();
343 *ib++ = 0;
344 *ib++ = 0;
345 *ib++ = 0;
346 *ib = 0;
347
348 *ie++ = 0;
349 *ie++ = 0;
350 *ie++ = 0;
351 *ie = 0;
352
353 memcpy(buffer, ptr, fRealRowWidth);
354
355 fRawSum.add(fRawSumBuffer, false);
356
357 fTable.num_rows++;
358
359 if (fTable.num_rows % fNumRowsPerTile != 0)
360 {
361 errno = fErrno;
362 return errno==0;
363 }
364
365 // use the least occupied queue
366 const auto imin = std::min_element(fCompressionQueues.begin(), fCompressionQueues.end());
367
368 if (!imin->emplace(InitNextCompression()))
369 {
370#ifdef __EXCEPTIONS
371 throw std::runtime_error("The compression queues are not started. Did you close the file before writing this row?");
372#else
373 gLog << ___err___ << "The compression queues are not started. Did you close the file before writing this row?" << std::endl;
374 errno = 0;
375 return false;
376#endif
377 }
378
379 errno = fErrno;
380 return errno==0;
381 }
382
383 /// update the real number of rows
384 void FlushNumRows()
385 {
386 SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile);
387 SetInt("ZNAXIS2", fTable.num_rows);
388 FlushHeader();
389 }
390
391 /// Setup the environment to compress yet another tile of data
392 /// @param target the struct where to host the produced parameters
393 CompressionTarget InitNextCompression()
394 {
395 CompressionTarget target(AddOneCatalogRow());
396
397 //fill up compression target
398 target.src = fSmartBuffer;
399 target.transposed_src = fMemPool.malloc();
400 target.num_rows = fTable.num_rows;
401
402 //fill up write to disk target
403 WriteTarget &write_target = target.target;
404 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
405 write_target.size = 0;
406 write_target.data = fMemPool.malloc();
407
408 //get a new buffer to host the incoming data
409 fSmartBuffer = fMemPool.malloc();
410
411 return target;
412 }
413
414 /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file.
415 uint32_t ShrinkCatalog()
416 {
417 //add empty row to get either the target number of rows, or a multiple of the allowed size
418 for (uint32_t i=0;i<fCatalogSize%fNumTiles;i++)
419 AddOneCatalogRow();
420
421 //did we write more rows than what the catalog could host ?
422 if (fCatalogSize <= fNumTiles) // nothing to do
423 return 1;
424
425 //always exact as extra rows were added just above
426 const uint32_t shrink_factor = fCatalogSize / fNumTiles;
427
428 //shrink the catalog !
429 uint32_t entry_id = 1;
430 auto it = fCatalog.begin();
431 it++;
432 for (; it != fCatalog.end(); it++)
433 {
434 if (entry_id >= fNumTiles)
435 break;
436
437 const uint32_t target_id = entry_id*shrink_factor;
438
439 auto jt = it;
440 for (uint32_t i=0; i<target_id-entry_id; i++)
441 jt++;
442
443 *it = *jt;
444
445 entry_id++;
446 }
447
448 const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
449
450 //remove the too many entries
451 for (uint32_t i=0;i<num_tiles_to_remove;i++)
452 {
453 fCatalog.pop_back();
454 fCatalogSize--;
455 }
456
457 //update header keywords
458 fNumRowsPerTile *= shrink_factor;
459
460 SetInt("ZTILELEN", fNumRowsPerTile);
461 SetInt("ZSHRINK", shrink_factor);
462
463 return shrink_factor;
464 }
465
466 /// close an open file.
467 /// @return the state of the file
468 bool close()
469 {
470 // stop compression and write threads
471 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
472 it->wait();
473
474 fWriteToDiskQueue.wait();
475
476 if (tellp() < 0)
477 return false;
478
479#ifdef __EXCEPTIONS
480 //check if something hapenned while the compression threads were working
481 //if so, re-throw the exception that was generated
482 if (fThreadsException != std::exception_ptr())
483 std::rethrow_exception(fThreadsException);
484#endif
485
486 //write the last tile of data (if any)
487 if (fErrno==0 && fTable.num_rows%fNumRowsPerTile!=0)
488 {
489 fWriteToDiskQueue.enablePromptExecution();
490 fCompressionQueues.front().enablePromptExecution();
491 fCompressionQueues.front().emplace(InitNextCompression());
492 }
493
494 AlignTo2880Bytes();
495
496 int64_t heap_size = 0;
497 int64_t compressed_offset = 0;
498 for (auto it=fCatalog.begin(); it!= fCatalog.end(); it++)
499 {
500 compressed_offset += sizeof(FITS::TileHeader);
501 heap_size += sizeof(FITS::TileHeader);
502 for (uint32_t j=0; j<it->size(); j++)
503 {
504 heap_size += (*it)[j].first;
505 (*it)[j].second = compressed_offset;
506 compressed_offset += (*it)[j].first;
507 if ((*it)[j].first == 0)
508 (*it)[j].second = 0;
509 }
510 }
511
512 const uint32_t shrink_factor = ShrinkCatalog();
513
514 //update header keywords
515 SetInt("ZNAXIS1", fRealRowWidth);
516 SetInt("ZNAXIS2", fTable.num_rows);
517
518 SetInt("ZHEAPPTR", fCatalogSize*fTable.num_cols*sizeof(uint64_t)*2);
519
520 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
521 const uint32_t total_catalog_width = 2*sizeof(int64_t)*fTable.num_cols;
522
523 SetInt("THEAP", total_num_tiles_written*total_catalog_width);
524 SetInt("NAXIS1", total_catalog_width);
525 SetInt("NAXIS2", total_num_tiles_written);
526 SetStr("RAWSUM", std::to_string((long long int)(fRawSum.val())));
527
528 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
529 SetFloat("ZRATIO", compression_ratio);
530
531 //add to the heap size the size of the gap between the catalog and the actual heap
532 heap_size += (fCatalogSize - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
533
534 SetInt("PCOUNT", heap_size, "size of special data area");
535
536 //Just for updating the fCatalogSum value
537 WriteCatalog();
538
539 fDataSum += fCatalogSum;
540
541 const Checksum checksm = UpdateHeaderChecksum();
542
543 if (!fFilebuf.close())
544 setstate(ios_base::failbit);
545
546 fSmartBuffer = std::shared_ptr<char>();
547
548 //restore the number of rows per tile in case the catalog has been shrinked
549 if (shrink_factor != 1)
550 fNumRowsPerTile /= shrink_factor;
551
552 if ((checksm+fDataSum).valid())
553 return true;
554
555 std::ostringstream sout;
556 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
557#ifdef __EXCEPTIONS
558 throw std::runtime_error(sout.str());
559#else
560 gLog << ___err___ << "ERROR - " << sout.str() << std::endl;
561 return false;
562#endif
563 }
564
565 /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
566 bool AddColumn(uint32_t cnt, char typechar, const std::string& name, const std::string& unit,
567 const std::string& comment="", bool addHeaderKeys=true)
568 {
569 return AddColumn(FITS::kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
570 }
571
572 /// Overload of the simplified compressed version
573 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const std::string& name,
574 const std::string& unit, const std::string& comment="", bool addHeaderKeys=true)
575 {
576 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
577 return false;
578
579 const size_t size = FITS::SizeFromType(typechar);
580
581 Table::Column col;
582 col.name = name;
583 col.type = typechar;
584 col.num = cnt;
585 col.size = size;
586 col.offset = fRealRowWidth;
587
588 fRealRowWidth += size*cnt;
589
590 fRealColumns.emplace_back(col, comp);
591
592 SetStr("ZFORM"+std::to_string((long long int)(fRealColumns.size())), std::to_string((long long int)(cnt))+typechar, "format of "+name+" "+FITS::CommentFromType(typechar));
593 SetStr("ZCTYP"+std::to_string((long long int)(fRealColumns.size())), "FACT", "Compression type: FACT");
594
595 return true;
596 }
597
598 /// Get and set the actual number of threads for this object
599 int32_t GetNumThreads() const { return fNumQueues; }
600 bool SetNumThreads(uint32_t num)
601 {
602 if (tellp()>0)
603 {
604#ifdef __EXCEPTIONS
605 throw std::runtime_error("Number of threads cannot be changed in the middle of writing a file");
606#else
607 gLog << ___err___ << "ERROR - Number of threads cannot be changed in the middle of writing a file" << std::endl;
608#endif
609 return false;
610 }
611
612 //get number of physically available threads
613#ifdef HAVE_BOOST_THREAD
614 unsigned int num_available_cores = boost::thread::hardware_concurrency();
615#else
616 unsigned int num_available_cores = std::thread::hardware_concurrency();
617 if (num_available_cores == 0)
618 num_available_cores = sysconf(_SC_NPROCESSORS_ONLN);
619#endif
620 // could not detect number of available cores from system properties...
621 if (num_available_cores == 0)
622 num_available_cores = 1;
623
624 // leave one core for the main thread and one for the writing
625 if (num > num_available_cores)
626 num = num_available_cores>2 ? num_available_cores-2 : 1;
627
628 fCompressionQueues.resize(num<1?1:num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false));
629 fNumQueues = num;
630
631 return true;
632 }
633
634 uint32_t GetNumTiles() const { return fNumTiles; }
635 void SetNumTiles(uint32_t num) { fNumTiles=num; }
636
637protected:
638
639 /// Allocates the required objects.
640 void reallocateBuffers()
641 {
642 size_t total_block_head_size = 0;
643 for (auto it=fRealColumns.begin(); it!=fRealColumns.end(); it++)
644 total_block_head_size += it->block_head.getSizeOnDisk();
645
646 const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + total_block_head_size + sizeof(FITS::TileHeader) + 8; //+8 for checksuming;
647 fMemPool.setChunkSize(chunk_size);
648 fSmartBuffer = fMemPool.malloc();
649 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
650 }
651
652 /// Actually does the writing to disk (and checksuming)
653 /// @param src the buffer to write
654 /// @param sizeToWrite how many bytes should be written
655 /// @return the state of the file
656 bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite)
657 {
658 char* checkSumPointer = src+4;
659 int32_t extraBytes = 0;
660 uint32_t sizeToChecksum = sizeToWrite;
661
662 //should we extend the array to the left ?
663 if (fCheckOffset != 0)
664 {
665 sizeToChecksum += fCheckOffset;
666 checkSumPointer -= fCheckOffset;
667 memset(checkSumPointer, 0, fCheckOffset);
668 }
669
670 //should we extend the array to the right ?
671 if (sizeToChecksum%4 != 0)
672 {
673 extraBytes = 4 - (sizeToChecksum%4);
674 memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
675 sizeToChecksum += extraBytes;
676 }
677
678 //do the checksum
679 fDataSum.add(checkSumPointer, sizeToChecksum);
680
681 fCheckOffset = (4 - extraBytes)%4;
682
683 //write data to disk
684 write(src+4, sizeToWrite);
685
686 return good();
687 }
688
689 /// Compress a given buffer based on the target. This is the method executed by the threads
690 /// @param target the struct hosting the parameters of the compression
691 /// @return number of bytes of the compressed data, or always 1 when used by the Queues
692 bool CompressBuffer(const CompressionTarget& target)
693 {
694 //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
695 //Really do not understand what's wrong...
696 //calibrate data if required
697 const uint32_t thisRoundNumRows = (target.num_rows%fNumRowsPerTile) ? target.num_rows%fNumRowsPerTile : fNumRowsPerTile;
698 for (uint32_t i=0;i<thisRoundNumRows;i++)
699 {
700 char* target_location = target.src.get() + fRealRowWidth*i;
701 DrsOffsetCalibrate(target_location);
702 }
703#ifdef __EXCEPTIONS
704 try
705 {
706#endif
707 //transpose the original data
708 copyTransposeTile(target.src.get(), target.transposed_src.get(), target.num_rows);
709
710 //compress the buffer
711 const uint64_t compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
712
713 //post the result to the writing queue
714 //get a copy so that it becomes non-const
715 fWriteToDiskQueue.emplace(target.target, compressed_size);
716
717#ifdef __EXCEPTIONS
718 }
719 catch (...)
720 {
721 fThreadsException = std::current_exception();
722 if (fNumQueues == 0)
723 std::rethrow_exception(fThreadsException);
724 }
725#endif
726
727 return true;
728 }
729
730 /// Write one compressed tile to disk. This is the method executed by the writing thread
731 /// @param target the struct hosting the write parameters
732 bool WriteBufferToDisk(const WriteTarget& target)
733 {
734 //is this the tile we're supposed to write ?
735 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
736 return false;
737
738 fLatestWrittenTile++;
739
740#ifdef __EXCEPTIONS
741 try
742 {
743#endif
744 //could not write the data to disk
745 if (!writeCompressedDataToDisk(target.data.get(), target.size))
746 fErrno = errno;
747#ifdef __EXCEPTIONS
748 }
749 catch (...)
750 {
751 fThreadsException = std::current_exception();
752 if (fNumQueues == 0)
753 std::rethrow_exception(fThreadsException);
754 }
755#endif
756 return true;
757 }
758
759 /// Compress a given buffer based on its source and destination
760 //src cannot be const, as applySMOOTHING is done in place
761 /// @param dest the buffer hosting the compressed data
762 /// @param src the buffer hosting the transposed data
763 /// @param num_rows the number of uncompressed rows in the transposed buffer
764 /// @param the number of bytes of the compressed data
765 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows, CatalogRow& catalog_row)
766 {
767 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
768 uint32_t offset = 0;
769
770 //skip the checksum reserved area
771 dest += 4;
772
773 //skip the 'TILE' marker and tile size entry
774 uint64_t compressedOffset = sizeof(FITS::TileHeader);
775
776 //now compress each column one by one by calling compression on arrays
777 for (uint32_t i=0;i<fRealColumns.size();i++)
778 {
779 catalog_row[i].second = compressedOffset;
780
781 if (fRealColumns[i].col.num == 0)
782 continue;
783
784 FITS::Compression& head = fRealColumns[i].block_head;
785
786 //set the default byte telling if uncompressed the compressed Flag
787 const uint64_t previousOffset = compressedOffset;
788
789 //skip header data
790 compressedOffset += head.getSizeOnDisk();
791
792 for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
793 {
794 switch (head.getProc(j))
795 {
796 case FITS::kFactRaw:
797 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
798 break;
799
800 case FITS::kFactSmoothing:
801 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
802 break;
803
804 case FITS::kFactHuffman16:
805 if (head.getOrdering() == FITS::kOrderByCol)
806 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
807 else
808 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
809 break;
810 }
811 }
812
813 //check if compressed size is larger than uncompressed
814 //if so set flag and redo it uncompressed
815 if ((head.getProc(0) != FITS::kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
816 {
817 //de-smooth !
818 if (head.getProc(0) == FITS::kFactSmoothing)
819 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
820
821 FITS::Compression he;
822
823 compressedOffset = previousOffset + he.getSizeOnDisk();
824 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
825
826 he.SetBlockSize(compressedOffset - previousOffset);
827 he.Memcpy(dest+previousOffset);
828
829 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
830
831 catalog_row[i].first = compressedOffset - catalog_row[i].second;
832 continue;
833 }
834
835 head.SetBlockSize(compressedOffset - previousOffset);
836 head.Memcpy(dest + previousOffset);
837
838 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
839 catalog_row[i].first = compressedOffset - catalog_row[i].second;
840 }
841
842 const FITS::TileHeader tile_head(thisRoundNumRows, compressedOffset);
843 memcpy(dest, &tile_head, sizeof(FITS::TileHeader));
844
845 return compressedOffset;
846 }
847
848 /// Transpose a tile to a new buffer
849 /// @param src buffer hosting the regular, row-ordered data
850 /// @param dest the target buffer that will receive the transposed data
851 void copyTransposeTile(const char* src, char* dest, uint32_t num_rows)
852 {
853 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
854
855 //copy the tile and transpose it
856 for (uint32_t i=0;i<fRealColumns.size();i++)
857 {
858 switch (fRealColumns[i].block_head.getOrdering())
859 {
860 case FITS::kOrderByRow:
861 //regular, "semi-transposed" copy
862 for (uint32_t k=0;k<thisRoundNumRows;k++)
863 {
864 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
865 dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
866 }
867 break;
868
869 case FITS::kOrderByCol:
870 //transposed copy
871 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
872 for (uint32_t k=0;k<thisRoundNumRows;k++)
873 {
874 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
875 dest += fRealColumns[i].col.size;
876 }
877 break;
878 };
879 }
880 }
881
882 /// Specific compression functions
883 /// @param dest the target buffer
884 /// @param src the source buffer
885 /// @param size number of bytes to copy
886 /// @return number of bytes written
887 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
888 {
889 memcpy(dest, src, size);
890 return size;
891 }
892
893 /// Do huffman encoding
894 /// @param dest the buffer that will receive the compressed data
895 /// @param src the buffer hosting the transposed data
896 /// @param numRows number of rows of data in the transposed buffer
897 /// @param sizeOfElems size in bytes of one data elements
898 /// @param numRowElems number of elements on each row
899 /// @return number of bytes written
900 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
901 {
902 std::string huffmanOutput;
903 uint32_t previousHuffmanSize = 0;
904
905 //if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
906 if (numRows < 2)
907 return numRows*sizeOfElems*numRowElems + 1000;
908
909 if (sizeOfElems < 2 )
910 {
911#ifdef __EXCEPTIONS
912 throw std::runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
913#else
914 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << std::endl;
915 return 0;
916#endif
917 }
918
919 uint32_t huffmanOffset = 0;
920 for (uint32_t j=0;j<numRowElems;j++)
921 {
922 Huffman::Encode(huffmanOutput,
923 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
924 numRows*(sizeOfElems/2));
925 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
926 huffmanOffset += sizeof(uint32_t);
927 previousHuffmanSize = huffmanOutput.size();
928 }
929
930 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
931
932 //only copy if not larger than not-compressed size
933 if (totalSize < numRows*sizeOfElems*numRowElems)
934 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
935
936 return totalSize;
937 }
938
939 /// Applies Thomas' DRS4 smoothing
940 /// @param data where to apply it
941 /// @param numElems how many elements of type int16_t are stored in the buffer
942 /// @return number of bytes modified
943 uint32_t applySMOOTHING(char* data, uint32_t numElems)
944 {
945 int16_t* short_data = reinterpret_cast<int16_t*>(data);
946 for (int j=numElems-1;j>1;j--)
947 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
948
949 return numElems*sizeof(int16_t);
950 }
951
952 /// Apply the inverse transform of the integer smoothing
953 /// @param data where to apply it
954 /// @param numElems how many elements of type int16_t are stored in the buffer
955 /// @return number of bytes modified
956 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
957 {
958 int16_t* short_data = reinterpret_cast<int16_t*>(data);
959 for (uint32_t j=2;j<numElems;j++)
960 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
961
962 return numElems*sizeof(uint16_t);
963 }
964
965
966
967 //thread related stuff
968 MemoryManager fMemPool; ///< Actual memory manager, providing memory for the compression buffers
969 int32_t fNumQueues; ///< Current number of threads that will be used by this object
970 uint64_t fMaxUsableMem; ///< Maximum number of bytes that can be allocated by the memory manager
971 int32_t fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads)
972
973 std::vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads)
974 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///< Writing queue (=thread)
975
976 // catalog related stuff
977 CatalogType fCatalog; ///< Catalog for this file
978 uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists)
979 uint32_t fNumTiles; ///< Number of pre-reserved tiles
980 uint32_t fNumRowsPerTile; ///< Number of rows per tile
981 off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file
982
983 // checksum related stuff
984 Checksum fCatalogSum; ///< Checksum of the catalog
985 Checksum fRawSum; ///< Raw sum (specific to FACT)
986 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
987
988 // data layout related stuff
989 /// Regular columns augmented with compression informations
990
991 public: // Public for the root dictionary
992 struct CompressedColumn
993 {
994 CompressedColumn(const Table::Column& c, const FITS::Compression& h) : col(c),
995 block_head(h)
996 {}
997#ifdef __MARS__ // Needed for the compilation ofthe dictionary
998 CompressedColumn() { }
999#endif
1000 Table::Column col; ///< the regular column entry
1001 FITS::Compression block_head; ///< the compression data associated with that column
1002 };
1003
1004 protected:
1005 std::vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file
1006 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row
1007 std::shared_ptr<char> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written
1008 std::vector<char> fRawSumBuffer; ///< buffer used for checksuming the incoming data, before compression
1009
1010 std::exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads
1011 int fErrno; ///< propagate errno to main thread
1012};
1013
1014#endif
Note: See TracBrowser for help on using the repository browser.