source: branches/Mars_use_drstimefiles/mcore/zofits.h@ 19779

Last change on this file since 19779 was 17779, checked in by tbretz, 11 years ago
Use COmmentFromType and SizeFromType from FITS namespace; try using sysconf if std::thread::hardware_concurrency fails; added a fix in reallocateBuffer which could cause a buffer overrun; open must be called in the ocnstructor after the variables have been initialized
File size: 40.0 KB
Line 
1#ifndef FACT_zofits
2#define FACT_zofits
3
4/*
5 * zofits.h
6 *
7 * FACT native compressed FITS writer
8 * Author: lyard
9 */
10#include "ofits.h"
11#include "huffman.h"
12#include "Queue.h"
13#include "MemoryManager.h"
14
15#ifdef HAVE_BOOST_THREAD
16#include <boost/thread.hpp>
17#else
18#include <unistd.h>
19#endif
20
21#if defined(__CINT__) || !defined(__EXCEPTIONS)
22namespace std
23{
24 typedef void* exception_ptr;
25};
26#endif
27
28class zofits : public ofits
29{
30#ifdef __MARS__ // Needed by CINT to access the structures
31public:
32#endif
33 /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin
34 template<class S>
35 struct QueueMin : std::list<S>
36 {
37 typename std::list<S>::iterator begin()
38 {
39 return min_element(std::list<S>::begin(), std::list<S>::end());
40 }
41 };
42
43#ifdef __CINT__
44 // CINT doesn't like the packed attribute...
45 // Therefore we give another hint of the size of the structure
46 struct CatalogEntry { char dummy[16]; };
47#else
48 //catalog types
49 struct CatalogEntry
50 {
51 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) { }
52 int64_t first; ///< Size of this column in the tile
53 int64_t second; ///< offset of this column in the tile, from the start of the heap area
54 } __attribute__((__packed__));
55#endif
56
57 typedef std::vector<CatalogEntry> CatalogRow;
58 typedef std::list<CatalogRow> CatalogType;
59
60 /// Parameters required to write a tile to disk
61 struct WriteTarget
62 {
63 bool operator < (const WriteTarget& other) const
64 {
65 return tile_num < other.tile_num;
66 }
67
68 WriteTarget() { }
69 WriteTarget(const WriteTarget &t, uint32_t sz) : tile_num(t.tile_num), size(sz), data(t.data) { }
70
71 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
72 uint32_t size; ///< Size to write
73 std::shared_ptr<char> data; ///< Memory block to write
74 };
75
76
77 /// Parameters required to compress a tile of data
78 struct CompressionTarget
79 {
80 CompressionTarget(CatalogRow& r) : catalog_entry(r)
81 {}
82
83 CatalogRow& catalog_entry; ///< Reference to the catalog entry to deal with
84 std::shared_ptr<char> src; ///< Original data
85 std::shared_ptr<char> transposed_src; ///< Transposed data
86 WriteTarget target; ///< Compressed data
87 uint32_t num_rows; ///< Number of rows to compress
88 };
89
90public:
91 /// static setter for the default number of threads to use. -1 means all available physical cores
92 static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)>-2) n=_n; return n; }
93 static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; }
94 static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; }
95 static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; }
96
97 /// constructors
98 /// @param numTiles how many data groups should be pre-reserved ?
99 /// @param rowPerTile how many rows will be grouped together in a single tile
100 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
101 zofits(uint32_t numTiles = DefaultMaxNumTiles(),
102 uint32_t rowPerTile = DefaultNumRowsPerTile(),
103 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
104 fMemPool(0, size_t(maxUsableMem)*1000),
105 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
106 {
107 InitMemberVariables(numTiles, rowPerTile, size_t(maxUsableMem)*1000);
108 SetNumThreads(DefaultNumThreads());
109 }
110
111 /// @param fname the target filename
112 /// @param numTiles how many data groups should be pre-reserved ?
113 /// @param rowPerTile how many rows will be grouped together in a single tile
114 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
115 zofits(const char* fname,
116 uint32_t numTiles = DefaultMaxNumTiles(),
117 uint32_t rowPerTile = DefaultNumRowsPerTile(),
118 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
119 fMemPool(0, size_t(maxUsableMem)*1000),
120 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
121 {
122 InitMemberVariables(numTiles, rowPerTile, size_t(maxUsableMem)*1000);
123 SetNumThreads(DefaultNumThreads());
124 open(fname);
125 }
126
127 zofits(const std::string &fname,
128 uint32_t numTiles = DefaultMaxNumTiles(),
129 uint32_t rowPerTile = DefaultNumRowsPerTile(),
130 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
131 fMemPool(0, size_t(maxUsableMem)*1000),
132 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
133 {
134 InitMemberVariables(numTiles, rowPerTile, size_t(maxUsableMem)*1000);
135 SetNumThreads(DefaultNumThreads());
136 open(fname);
137 }
138
139 //initialization of member variables
140 /// @param nt number of tiles
141 /// @param rpt number of rows per tile
142 /// @param maxUsableMem max amount of RAM to be used by the compression buffers
143 void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
144 {
145 fCheckOffset = 0;
146 fNumQueues = 0;
147
148 fNumTiles = nt==0 ? 1 : nt;
149 fNumRowsPerTile = rpt;
150
151 fRealRowWidth = 0;
152 fCatalogOffset = 0;
153 fCatalogSize = 0;
154
155 fMaxUsableMem = maxUsableMem;
156#ifdef __EXCEPTIONS
157 fThreadsException = std::exception_ptr();
158#endif
159 fErrno = 0;
160 }
161
162 /// write the header of the binary table
163 /// @param name the name of the table to be created
164 /// @return the state of the file
165 virtual bool WriteTableHeader(const char* name="DATA")
166 {
167 reallocateBuffers();
168
169 SetInt("ZNAXIS1", fRealRowWidth);
170
171 ofits::WriteTableHeader(name);
172
173 fCompressionQueues.front().setPromptExecution(fNumQueues==0);
174 fWriteToDiskQueue.setPromptExecution(fNumQueues==0);
175
176 if (fNumQueues != 0)
177 {
178 //start the compression queues
179 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
180 it->start();
181
182 //start the disk writer
183 fWriteToDiskQueue.start();
184 }
185
186 //mark that no tile has been written so far
187 fLatestWrittenTile = -1;
188
189 //no wiring error (in the writing of the data) has occured so far
190 fErrno = 0;
191
192 return good();
193 }
194
195 /// open a new file.
196 /// @param filename the name of the file
197 /// @param Whether or not the name of the extension should be added or not
198 void open(const char* filename, bool addEXTNAMEKey=true)
199 {
200 ofits::open(filename, addEXTNAMEKey);
201
202 //add compression-related header entries
203 SetBool( "ZTABLE", true, "Table is compressed");
204 SetInt( "ZNAXIS1", 0, "Width of uncompressed rows");
205 SetInt( "ZNAXIS2", 0, "Number of uncompressed rows");
206 SetInt( "ZPCOUNT", 0, "");
207 SetInt( "ZHEAPPTR", 0, "");
208 SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
209 SetInt( "THEAP", 0, "");
210 SetStr( "RAWSUM", " 0", "Checksum of raw little endian data");
211 SetFloat("ZRATIO", 0, "Compression ratio");
212 SetInt( "ZSHRINK", 1, "Catalog shrink factor");
213
214 fCatalogSize = 0;
215 fRealRowWidth = 0;
216 fCatalogOffset = 0;
217 fCatalogSize = 0;
218 fCheckOffset = 0;
219
220 fRealColumns.clear();
221 fCatalog.clear();
222 fCatalogSum.reset();
223 fRawSum.reset();
224 }
225
226 void open(const std::string &filename, bool addEXTNAMEKey=true)
227 {
228 open(filename.c_str(), addEXTNAMEKey);
229 }
230
231 /// Super method. does nothing as zofits does not know about DrsOffsets
232 /// @return the state of the file
233 virtual bool WriteDrsOffsetsTable()
234 {
235 return good();
236 }
237
238 /// Returns the number of bytes per uncompressed row
239 /// @return number of bytes per uncompressed row
240 uint32_t GetBytesPerRow() const
241 {
242 return fRealRowWidth;
243 }
244
245 /// Write the data catalog
246 /// @return the state of the file
247 bool WriteCatalog()
248 {
249 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
250 const uint32_t total_catalog_size = fNumTiles*one_catalog_row_size;
251
252 // swap the catalog bytes before writing
253 std::vector<char> swapped_catalog(total_catalog_size);
254
255 uint32_t shift = 0;
256 for (auto it=fCatalog.cbegin(); it!=fCatalog.cend(); it++)
257 {
258 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
259 shift += one_catalog_row_size;
260 }
261
262 if (fCatalogSize < fNumTiles)
263 memset(swapped_catalog.data()+shift, 0, total_catalog_size-shift);
264
265 // first time writing ? remember where we are
266 if (fCatalogOffset == 0)
267 fCatalogOffset = tellp();
268
269 // remember where we came from
270 const off_t where_are_we = tellp();
271
272 // write to disk
273 seekp(fCatalogOffset);
274 write(swapped_catalog.data(), total_catalog_size);
275
276 if (where_are_we != fCatalogOffset)
277 seekp(where_are_we);
278
279 // udpate checksum
280 fCatalogSum.reset();
281 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
282
283 return good();
284 }
285
286 /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets.
287 virtual void DrsOffsetCalibrate(char* )
288 {
289
290 }
291
292 CatalogRow& AddOneCatalogRow()
293 {
294 // add one row to the catalog
295 fCatalog.emplace_back(CatalogRow());
296 fCatalog.back().resize(fTable.num_cols);
297 for (auto it=fCatalog.back().begin(); it != fCatalog.back().end(); it++)
298 *it = CatalogEntry(0,0);
299
300 fCatalogSize++;
301
302 return fCatalog.back();
303 }
304
305 /// write one row of data
306 /// Note, in a multi-threaded environment (NumThreads>0), the return code should be checked rather
307 /// than the badbit() of the stream (it might have been set by a thread before the errno has been set)
308 /// errno will then contain the correct error number of the last error which happened during writing.
309 /// @param ptr the source buffer
310 /// @param the number of bytes to write
311 /// @return the state of the file. WARNING: with multithreading, this will most likely be the state of the file before the data is actually written
312 bool WriteRow(const void* ptr, size_t cnt, bool = true)
313 {
314 if (cnt != fRealRowWidth)
315 {
316#ifdef __EXCEPTIONS
317 throw std::runtime_error("Wrong size of row given to WriteRow");
318#else
319 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << std::endl;
320 return false;
321#endif
322 }
323
324#ifdef __EXCEPTIONS
325 //check if something hapenned while the compression threads were working
326 //if so, re-throw the exception that was generated
327 if (fThreadsException != std::exception_ptr())
328 std::rethrow_exception(fThreadsException);
329#endif
330
331 //copy current row to pool or rows waiting for compression
332 char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
333 memcpy(target_location, ptr, fRealRowWidth);
334
335 //for now, make an extra copy of the data, for RAWSUM checksuming.
336 //Ideally this should be moved to the threads
337 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
338 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
339 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
340 char* buffer = fRawSumBuffer.data() + rawOffset;
341 auto ib = fRawSumBuffer.begin();
342 auto ie = fRawSumBuffer.rbegin();
343 *ib++ = 0;
344 *ib++ = 0;
345 *ib++ = 0;
346 *ib = 0;
347
348 *ie++ = 0;
349 *ie++ = 0;
350 *ie++ = 0;
351 *ie = 0;
352
353 memcpy(buffer, ptr, fRealRowWidth);
354
355 fRawSum.add(fRawSumBuffer, false);
356
357 fTable.num_rows++;
358
359 if (fTable.num_rows % fNumRowsPerTile != 0)
360 {
361 errno = fErrno;
362 return errno==0;
363 }
364
365 // use the least occupied queue
366 const auto imin = std::min_element(fCompressionQueues.begin(), fCompressionQueues.end());
367
368 if (!imin->emplace(InitNextCompression()))
369 {
370#ifdef __EXCEPTIONS
371 throw std::runtime_error("The compression queues are not started. Did you close the file before writing this row?");
372#else
373 gLog << ___err___ << "The compression queues are not started. Did you close the file before writing this row?" << std::endl;
374 errno = 0;
375 return false;
376#endif
377 }
378
379 errno = fErrno;
380 return errno==0;
381 }
382
383 /// update the real number of rows
384 void FlushNumRows()
385 {
386 SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile);
387 SetInt("ZNAXIS2", fTable.num_rows);
388 FlushHeader();
389 }
390
391 /// Setup the environment to compress yet another tile of data
392 /// @param target the struct where to host the produced parameters
393 CompressionTarget InitNextCompression()
394 {
395 CompressionTarget target(AddOneCatalogRow());
396
397 //fill up compression target
398 target.src = fSmartBuffer;
399 target.transposed_src = fMemPool.malloc();
400 target.num_rows = fTable.num_rows;
401
402 //fill up write to disk target
403 WriteTarget &write_target = target.target;
404 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
405 write_target.size = 0;
406 write_target.data = fMemPool.malloc();
407
408 //get a new buffer to host the incoming data
409 fSmartBuffer = fMemPool.malloc();
410
411 return target;
412 }
413
414 /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file.
415 uint32_t ShrinkCatalog()
416 {
417 //add empty row to get either the target number of rows, or a multiple of the allowed size
418 for (uint32_t i=0;i<fCatalogSize%fNumTiles;i++)
419 AddOneCatalogRow();
420
421 //did we write more rows than what the catalog could host ?
422 if (fCatalogSize <= fNumTiles) // nothing to do
423 return 1;
424
425 //always exact as extra rows were added just above
426 const uint32_t shrink_factor = fCatalogSize / fNumTiles;
427
428 //shrink the catalog !
429 uint32_t entry_id = 1;
430 auto it = fCatalog.begin();
431 it++;
432 for (; it != fCatalog.end(); it++)
433 {
434 if (entry_id >= fNumTiles)
435 break;
436
437 const uint32_t target_id = entry_id*shrink_factor;
438
439 auto jt = it;
440 for (uint32_t i=0; i<target_id-entry_id; i++)
441 jt++;
442
443 *it = *jt;
444
445 entry_id++;
446 }
447
448 const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
449
450 //remove the too many entries
451 for (uint32_t i=0;i<num_tiles_to_remove;i++)
452 {
453 fCatalog.pop_back();
454 fCatalogSize--;
455 }
456
457 //update header keywords
458 fNumRowsPerTile *= shrink_factor;
459
460 SetInt("ZTILELEN", fNumRowsPerTile);
461 SetInt("ZSHRINK", shrink_factor);
462
463 return shrink_factor;
464 }
465
466 /// close an open file.
467 /// @return the state of the file
468 bool close()
469 {
470 // stop compression and write threads
471 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
472 it->wait();
473
474 fWriteToDiskQueue.wait();
475
476 if (tellp() < 0)
477 return false;
478
479#ifdef __EXCEPTIONS
480 //check if something hapenned while the compression threads were working
481 //if so, re-throw the exception that was generated
482 if (fThreadsException != std::exception_ptr())
483 std::rethrow_exception(fThreadsException);
484#endif
485
486 //write the last tile of data (if any)
487 if (fErrno==0 && fTable.num_rows%fNumRowsPerTile!=0)
488 {
489 fWriteToDiskQueue.enablePromptExecution();
490 fCompressionQueues.front().enablePromptExecution();
491 fCompressionQueues.front().emplace(InitNextCompression());
492 }
493
494 AlignTo2880Bytes();
495
496 int64_t heap_size = 0;
497 int64_t compressed_offset = 0;
498 for (auto it=fCatalog.begin(); it!= fCatalog.end(); it++)
499 {
500 compressed_offset += sizeof(FITS::TileHeader);
501 heap_size += sizeof(FITS::TileHeader);
502 for (uint32_t j=0; j<it->size(); j++)
503 {
504 heap_size += (*it)[j].first;
505 (*it)[j].second = compressed_offset;
506 compressed_offset += (*it)[j].first;
507 if ((*it)[j].first == 0)
508 (*it)[j].second = 0;
509 }
510 }
511
512 const uint32_t shrink_factor = ShrinkCatalog();
513
514 //update header keywords
515 SetInt("ZNAXIS1", fRealRowWidth);
516 SetInt("ZNAXIS2", fTable.num_rows);
517
518 SetInt("ZHEAPPTR", fCatalogSize*fTable.num_cols*sizeof(uint64_t)*2);
519
520 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
521 const uint32_t total_catalog_width = 2*sizeof(int64_t)*fTable.num_cols;
522
523 SetInt("THEAP", total_num_tiles_written*total_catalog_width);
524 SetInt("NAXIS1", total_catalog_width);
525 SetInt("NAXIS2", total_num_tiles_written);
526 SetStr("RAWSUM", std::to_string((long long int)(fRawSum.val())));
527
528 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
529 SetFloat("ZRATIO", compression_ratio);
530
531 //add to the heap size the size of the gap between the catalog and the actual heap
532 heap_size += (fCatalogSize - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
533
534 SetInt("PCOUNT", heap_size, "size of special data area");
535
536 //Just for updating the fCatalogSum value
537 WriteCatalog();
538
539 fDataSum += fCatalogSum;
540
541 const Checksum checksm = UpdateHeaderChecksum();
542
543 if (!fFilebuf.close())
544 setstate(ios_base::failbit);
545
546 fSmartBuffer = std::shared_ptr<char>();
547
548 //restore the number of rows per tile in case the catalog has been shrinked
549 if (shrink_factor != 1)
550 fNumRowsPerTile /= shrink_factor;
551
552 if ((checksm+fDataSum).valid())
553 return true;
554
555 std::ostringstream sout;
556 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
557#ifdef __EXCEPTIONS
558 throw std::runtime_error(sout.str());
559#else
560 gLog << ___err___ << "ERROR - " << sout.str() << std::endl;
561 return false;
562#endif
563 }
564
565 /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
566 bool AddColumn(uint32_t cnt, char typechar, const std::string& name, const std::string& unit,
567 const std::string& comment="", bool addHeaderKeys=true)
568 {
569 return AddColumn(FITS::kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
570 }
571
572 /// Overload of the simplified compressed version
573 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const std::string& name,
574 const std::string& unit, const std::string& comment="", bool addHeaderKeys=true)
575 {
576 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
577 return false;
578
579 const size_t size = FITS::SizeFromType(typechar);
580
581 Table::Column col;
582 col.name = name;
583 col.type = typechar;
584 col.num = cnt;
585 col.size = size;
586 col.offset = fRealRowWidth;
587
588 fRealRowWidth += size*cnt;
589
590 fRealColumns.emplace_back(col, comp);
591
592 SetStr("ZFORM"+std::to_string((long long int)(fRealColumns.size())), std::to_string((long long int)(cnt))+typechar, "format of "+name+" "+FITS::CommentFromType(typechar));
593 SetStr("ZCTYP"+std::to_string((long long int)(fRealColumns.size())), "FACT", "Compression type: FACT");
594
595 return true;
596 }
597
598 /// Get and set the actual number of threads for this object
599 int32_t GetNumThreads() const { return fNumQueues; }
600 bool SetNumThreads(uint32_t num)
601 {
602 if (tellp()>0)
603 {
604#ifdef __EXCEPTIONS
605 throw std::runtime_error("Number of threads cannot be changed in the middle of writing a file");
606#else
607 gLog << ___err___ << "ERROR - Number of threads cannot be changed in the middle of writing a file" << std::endl;
608#endif
609 return false;
610 }
611
612 //get number of physically available threads
613#ifdef HAVE_BOOST_THREAD
614 unsigned int num_available_cores = boost::thread::hardware_concurrency();
615#else
616 unsigned int num_available_cores = std::thread::hardware_concurrency();
617 if (num_available_cores == 0)
618 num_available_cores = sysconf(_SC_NPROCESSORS_ONLN);
619#endif
620 // could not detect number of available cores from system properties...
621 if (num_available_cores == 0)
622 num_available_cores = 1;
623
624 // leave one core for the main thread and one for the writing
625 if (num > num_available_cores)
626 num = num_available_cores>2 ? num_available_cores-2 : 1;
627
628 fCompressionQueues.resize(num<1?1:num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false));
629 fNumQueues = num;
630
631 return true;
632 }
633
634 uint32_t GetNumTiles() const { return fNumTiles; }
635 void SetNumTiles(uint32_t num) { fNumTiles=num; }
636
637protected:
638
639 /// Allocates the required objects.
640 void reallocateBuffers()
641 {
642 size_t total_block_head_size = 0;
643 for (auto it=fRealColumns.begin(); it!=fRealColumns.end(); it++)
644 total_block_head_size += it->block_head.getSizeOnDisk();
645
646 const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + total_block_head_size + sizeof(FITS::TileHeader) + 8; //+8 for checksuming;
647 fMemPool.setChunkSize(chunk_size);
648 fSmartBuffer = fMemPool.malloc();
649 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
650 }
651
652 /// Actually does the writing to disk (and checksuming)
653 /// @param src the buffer to write
654 /// @param sizeToWrite how many bytes should be written
655 /// @return the state of the file
656 bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite)
657 {
658 char* checkSumPointer = src+4;
659 int32_t extraBytes = 0;
660 uint32_t sizeToChecksum = sizeToWrite;
661
662 //should we extend the array to the left ?
663 if (fCheckOffset != 0)
664 {
665 sizeToChecksum += fCheckOffset;
666 checkSumPointer -= fCheckOffset;
667 memset(checkSumPointer, 0, fCheckOffset);
668 }
669
670 //should we extend the array to the right ?
671 if (sizeToChecksum%4 != 0)
672 {
673 extraBytes = 4 - (sizeToChecksum%4);
674 memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
675 sizeToChecksum += extraBytes;
676 }
677
678 //do the checksum
679 fDataSum.add(checkSumPointer, sizeToChecksum);
680
681 fCheckOffset = (4 - extraBytes)%4;
682
683 //write data to disk
684 write(src+4, sizeToWrite);
685
686 return good();
687 }
688
689 /// Compress a given buffer based on the target. This is the method executed by the threads
690 /// @param target the struct hosting the parameters of the compression
691 /// @return number of bytes of the compressed data, or always 1 when used by the Queues
692 bool CompressBuffer(const CompressionTarget& target)
693 {
694 //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
695 //Really do not understand what's wrong...
696 //calibrate data if required
697 const uint32_t thisRoundNumRows = (target.num_rows%fNumRowsPerTile) ? target.num_rows%fNumRowsPerTile : fNumRowsPerTile;
698 for (uint32_t i=0;i<thisRoundNumRows;i++)
699 {
700 char* target_location = target.src.get() + fRealRowWidth*i;
701 DrsOffsetCalibrate(target_location);
702 }
703#ifdef __EXCEPTIONS
704 try
705 {
706#endif
707 //transpose the original data
708 copyTransposeTile(target.src.get(), target.transposed_src.get(), target.num_rows);
709
710 //compress the buffer
711 const uint64_t compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
712
713 //post the result to the writing queue
714 //get a copy so that it becomes non-const
715 fWriteToDiskQueue.emplace(target.target, compressed_size);
716
717#ifdef __EXCEPTIONS
718 }
719 catch (...)
720 {
721 fThreadsException = std::current_exception();
722 if (fNumQueues == 0)
723 std::rethrow_exception(fThreadsException);
724 }
725#endif
726
727 return true;
728 }
729
730 /// Write one compressed tile to disk. This is the method executed by the writing thread
731 /// @param target the struct hosting the write parameters
732 bool WriteBufferToDisk(const WriteTarget& target)
733 {
734 //is this the tile we're supposed to write ?
735 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
736 return false;
737
738 fLatestWrittenTile++;
739
740#ifdef __EXCEPTIONS
741 try
742 {
743#endif
744 //could not write the data to disk
745 if (!writeCompressedDataToDisk(target.data.get(), target.size))
746 fErrno = errno;
747#ifdef __EXCEPTIONS
748 }
749 catch (...)
750 {
751 fThreadsException = std::current_exception();
752 if (fNumQueues == 0)
753 std::rethrow_exception(fThreadsException);
754 }
755#endif
756 return true;
757 }
758
759 /// Compress a given buffer based on its source and destination
760 //src cannot be const, as applySMOOTHING is done in place
761 /// @param dest the buffer hosting the compressed data
762 /// @param src the buffer hosting the transposed data
763 /// @param num_rows the number of uncompressed rows in the transposed buffer
764 /// @param the number of bytes of the compressed data
765 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows, CatalogRow& catalog_row)
766 {
767 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
768 uint32_t offset = 0;
769
770 //skip the checksum reserved area
771 dest += 4;
772
773 //skip the 'TILE' marker and tile size entry
774 uint64_t compressedOffset = sizeof(FITS::TileHeader);
775
776 //now compress each column one by one by calling compression on arrays
777 for (uint32_t i=0;i<fRealColumns.size();i++)
778 {
779 catalog_row[i].second = compressedOffset;
780
781 if (fRealColumns[i].col.num == 0)
782 continue;
783
784 FITS::Compression& head = fRealColumns[i].block_head;
785
786 //set the default byte telling if uncompressed the compressed Flag
787 const uint64_t previousOffset = compressedOffset;
788
789 //skip header data
790 compressedOffset += head.getSizeOnDisk();
791
792 for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
793 {
794 switch (head.getProc(j))
795 {
796 case FITS::kFactRaw:
797 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
798 break;
799
800 case FITS::kFactSmoothing:
801 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
802 break;
803
804 case FITS::kFactHuffman16:
805 if (head.getOrdering() == FITS::kOrderByCol)
806 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
807 else
808 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
809 break;
810 }
811 }
812
813 //check if compressed size is larger than uncompressed
814 //if so set flag and redo it uncompressed
815 if ((head.getProc(0) != FITS::kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
816 {
817 //de-smooth !
818 if (head.getProc(0) == FITS::kFactSmoothing)
819 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
820
821 FITS::Compression he;
822
823 compressedOffset = previousOffset + he.getSizeOnDisk();
824 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
825
826 he.SetBlockSize(compressedOffset - previousOffset);
827 he.Memcpy(dest+previousOffset);
828
829 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
830
831 catalog_row[i].first = compressedOffset - catalog_row[i].second;
832 continue;
833 }
834
835 head.SetBlockSize(compressedOffset - previousOffset);
836 head.Memcpy(dest + previousOffset);
837
838 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
839 catalog_row[i].first = compressedOffset - catalog_row[i].second;
840 }
841
842 const FITS::TileHeader tile_head(thisRoundNumRows, compressedOffset);
843 memcpy(dest, &tile_head, sizeof(FITS::TileHeader));
844
845 return compressedOffset;
846 }
847
848 /// Transpose a tile to a new buffer
849 /// @param src buffer hosting the regular, row-ordered data
850 /// @param dest the target buffer that will receive the transposed data
851 void copyTransposeTile(const char* src, char* dest, uint32_t num_rows)
852 {
853 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
854
855 //copy the tile and transpose it
856 for (uint32_t i=0;i<fRealColumns.size();i++)
857 {
858 switch (fRealColumns[i].block_head.getOrdering())
859 {
860 case FITS::kOrderByRow:
861 //regular, "semi-transposed" copy
862 for (uint32_t k=0;k<thisRoundNumRows;k++)
863 {
864 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
865 dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
866 }
867 break;
868
869 case FITS::kOrderByCol:
870 //transposed copy
871 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
872 for (uint32_t k=0;k<thisRoundNumRows;k++)
873 {
874 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
875 dest += fRealColumns[i].col.size;
876 }
877 break;
878 };
879 }
880 }
881
882 /// Specific compression functions
883 /// @param dest the target buffer
884 /// @param src the source buffer
885 /// @param size number of bytes to copy
886 /// @return number of bytes written
887 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
888 {
889 memcpy(dest, src, size);
890 return size;
891 }
892
893 /// Do huffman encoding
894 /// @param dest the buffer that will receive the compressed data
895 /// @param src the buffer hosting the transposed data
896 /// @param numRows number of rows of data in the transposed buffer
897 /// @param sizeOfElems size in bytes of one data elements
898 /// @param numRowElems number of elements on each row
899 /// @return number of bytes written
900 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
901 {
902 std::string huffmanOutput;
903 uint32_t previousHuffmanSize = 0;
904
905 //if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
906 if (numRows < 2)
907 return numRows*sizeOfElems*numRowElems + 1000;
908
909 if (sizeOfElems < 2 )
910 {
911#ifdef __EXCEPTIONS
912 throw std::runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
913#else
914 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << std::endl;
915 return 0;
916#endif
917 }
918
919 uint32_t huffmanOffset = 0;
920 for (uint32_t j=0;j<numRowElems;j++)
921 {
922 Huffman::Encode(huffmanOutput,
923 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
924 numRows*(sizeOfElems/2));
925 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
926 huffmanOffset += sizeof(uint32_t);
927 previousHuffmanSize = huffmanOutput.size();
928 }
929
930 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
931
932 //only copy if not larger than not-compressed size
933 if (totalSize < numRows*sizeOfElems*numRowElems)
934 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
935
936 return totalSize;
937 }
938
939 /// Applies Thomas' DRS4 smoothing
940 /// @param data where to apply it
941 /// @param numElems how many elements of type int16_t are stored in the buffer
942 /// @return number of bytes modified
943 uint32_t applySMOOTHING(char* data, uint32_t numElems)
944 {
945 int16_t* short_data = reinterpret_cast<int16_t*>(data);
946 for (int j=numElems-1;j>1;j--)
947 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
948
949 return numElems*sizeof(int16_t);
950 }
951
952 /// Apply the inverse transform of the integer smoothing
953 /// @param data where to apply it
954 /// @param numElems how many elements of type int16_t are stored in the buffer
955 /// @return number of bytes modified
956 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
957 {
958 int16_t* short_data = reinterpret_cast<int16_t*>(data);
959 for (uint32_t j=2;j<numElems;j++)
960 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
961
962 return numElems*sizeof(uint16_t);
963 }
964
965
966
967 //thread related stuff
968 MemoryManager fMemPool; ///< Actual memory manager, providing memory for the compression buffers
969 int32_t fNumQueues; ///< Current number of threads that will be used by this object
970 uint64_t fMaxUsableMem; ///< Maximum number of bytes that can be allocated by the memory manager
971 int32_t fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads)
972
973 std::vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads)
974 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///< Writing queue (=thread)
975
976 // catalog related stuff
977 CatalogType fCatalog; ///< Catalog for this file
978 uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists)
979 uint32_t fNumTiles; ///< Number of pre-reserved tiles
980 uint32_t fNumRowsPerTile; ///< Number of rows per tile
981 off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file
982
983 // checksum related stuff
984 Checksum fCatalogSum; ///< Checksum of the catalog
985 Checksum fRawSum; ///< Raw sum (specific to FACT)
986 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
987
988 // data layout related stuff
989 /// Regular columns augmented with compression informations
990 struct CompressedColumn
991 {
992 CompressedColumn(const Table::Column& c, const FITS::Compression& h) : col(c),
993 block_head(h)
994 {}
995#ifdef __MARS__ // Needed for the compilation ofthe dictionary
996 CompressedColumn() { }
997#endif
998 Table::Column col; ///< the regular column entry
999 FITS::Compression block_head; ///< the compression data associated with that column
1000 };
1001 std::vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file
1002 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row
1003 std::shared_ptr<char> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written
1004 std::vector<char> fRawSumBuffer; ///< buffer used for checksuming the incoming data, before compression
1005
1006 std::exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads
1007 int fErrno; ///< propagate errno to main thread
1008};
1009
1010#endif
Note: See TracBrowser for help on using the repository browser.