source: trunk/Mars/mcore/zofits.h@ 17263

Last change on this file since 17263 was 17263, checked in by tbretz, 11 years ago
Added more std:: namespace qualifiers
File size: 39.1 KB
Line 
1#ifndef FACT_zofits
2#define FACT_zofits
3
4/*
5 * zofits.h
6 *
7 * FACT native compressed FITS writer
8 * Author: lyard
9 */
10
11#include "ofits.h"
12#include "zfits.h"
13#include "Queue.h"
14#include "MemoryManager.h"
15
16#ifdef USE_BOOST_THREADS
17#include <boost/thread.hpp>
18#endif
19
20#ifndef __MARS__
21namespace std
22{
23#endif
24
25class zofits : public ofits
26{
27 /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin
28 template<class S>
29 struct QueueMin : std::list<S>
30 {
31 typename std::list<S>::iterator begin()
32 {
33 return min_element(std::list<S>::begin(), std::list<S>::end());
34 }
35 };
36
37
38 //catalog types
39 struct CatalogEntry
40 {
41 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) {};
42 int64_t first; ///< Size of this column in the tile
43 int64_t second; ///< offset of this column in the tile, from the start of the heap area
44 } __attribute__((__packed__));
45
46 typedef vector<CatalogEntry> CatalogRow;
47 typedef list<CatalogRow> CatalogType;
48
49
50 /// Parameters required to write a tile to disk
51 struct WriteTarget
52 {
53 bool operator < (const WriteTarget& other)
54 {
55 return tile_num < other.tile_num;
56 }
57
58 WriteTarget() { }
59 WriteTarget(const WriteTarget &t, uint32_t sz) : tile_num(t.tile_num), size(sz), data(t.data) { }
60
61 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
62 uint32_t size; ///< Size to write
63 shared_ptr<char> data; ///< Memory block to write
64 };
65
66
67 /// Parameters required to compress a tile of data
68 struct CompressionTarget
69 {
70 CompressionTarget(CatalogRow& r) : catalog_entry(r)
71 {}
72
73 CatalogRow& catalog_entry; ///< Reference to the catalog entry to deal with
74 shared_ptr<char> src; ///< Original data
75 shared_ptr<char> transposed_src; ///< Transposed data
76 WriteTarget target; ///< Compressed data
77 uint32_t num_rows; ///< Number of rows to compress
78 };
79
80public:
81 /// static setter for the default number of threads to use. -1 means all available physical cores
82 static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)<-1) n=_n; return n; }
83 static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; }
84 static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; }
85 static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; }
86
87 /// constructors
88 /// @param numTiles how many data groups should be pre-reserved ?
89 /// @param rowPerTile how many rows will be grouped together in a single tile
90 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
91 zofits(uint32_t numTiles = DefaultMaxNumTiles(),
92 uint32_t rowPerTile = DefaultNumRowsPerTile(),
93 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
94 fMemPool(0, maxUsableMem*1000),
95 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), false)
96 {
97 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
98 SetNumThreads(DefaultNumThreads());
99 }
100
101 /// @param fname the target filename
102 /// @param numTiles how many data groups should be pre-reserved ?
103 /// @param rowPerTile how many rows will be grouped together in a single tile
104 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
105 zofits(const char* fname,
106 uint32_t numTiles = DefaultMaxNumTiles(),
107 uint32_t rowPerTile = DefaultNumRowsPerTile(),
108 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(fname),
109 fMemPool(0, maxUsableMem*1000),
110 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), false)
111 {
112 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
113 SetNumThreads(DefaultNumThreads());
114 }
115
116 //initialization of member variables
117 /// @param nt number of tiles
118 /// @param rpt number of rows per tile
119 /// @param maxUsableMem max amount of RAM to be used by the compression buffers
120 void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
121 {
122 fCheckOffset = 0;
123 fNumQueues = 0;
124
125 fNumTiles = nt==0 ? 1 : nt;
126 fNumRowsPerTile = rpt;
127
128 fRealRowWidth = 0;
129 fCatalogOffset = 0;
130 fCatalogSize = 0;
131
132 fMaxUsableMem = maxUsableMem;
133#ifdef __EXCEPTIONS
134 fThreadsException = exception_ptr();
135#endif
136 fErrno = 0;
137 }
138
139 /// write the header of the binary table
140 /// @param name the name of the table to be created
141 /// @return the state of the file
142 virtual bool WriteTableHeader(const char* name="DATA")
143 {
144 reallocateBuffers();
145
146 ofits::WriteTableHeader(name);
147
148 if (fNumQueues != 0)
149 {
150 //start the compression queues
151 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
152 it->start();
153
154 //start the disk writer
155 fWriteToDiskQueue.start();
156 }
157
158 //mark that no tile has been written so far
159 fLatestWrittenTile = -1;
160
161 return good();
162 }
163
164 /// open a new file.
165 /// @param filename the name of the file
166 /// @param Whether or not the name of the extension should be added or not
167 void open(const char* filename, bool addEXTNAMEKey=true)
168 {
169 ofits::open(filename, addEXTNAMEKey);
170
171 //add compression-related header entries
172 SetBool( "ZTABLE", true, "Table is compressed");
173 SetInt( "ZNAXIS1", 0, "Width of uncompressed rows");
174 SetInt( "ZNAXIS2", 0, "Number of uncompressed rows");
175 SetInt( "ZPCOUNT", 0, "");
176 SetInt( "ZHEAPPTR", 0, "");
177 SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
178 SetInt( "THEAP", 0, "");
179 SetStr( "RAWSUM", " 0", "Checksum of raw little endian data");
180 SetFloat("ZRATIO", 0, "Compression ratio");
181 SetInt( "ZSHRINK", 1, "Catalog shrink factor");
182
183 fCatalogSize = 0;
184 fCatalog.clear();
185 fRawSum.reset();
186 }
187
188 /// Super method. does nothing as zofits does not know about DrsOffsets
189 /// @return the state of the file
190 virtual bool WriteDrsOffsetsTable()
191 {
192 return good();
193 }
194
195 /// Returns the number of bytes per uncompressed row
196 /// @return number of bytes per uncompressed row
197 uint32_t GetBytesPerRow() const
198 {
199 return fRealRowWidth;
200 }
201
202 /// Write the data catalog
203 /// @return the state of the file
204 bool WriteCatalog()
205 {
206 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
207 const uint32_t total_catalog_size = fNumTiles*one_catalog_row_size;
208
209 // swap the catalog bytes before writing
210 vector<char> swapped_catalog(total_catalog_size);
211
212 uint32_t shift = 0;
213 for (auto it=fCatalog.cbegin(); it!=fCatalog.cend(); it++)
214 {
215 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
216 shift += one_catalog_row_size;
217 }
218
219 if (fCatalogSize < fNumTiles)
220 memset(swapped_catalog.data()+shift, 0, total_catalog_size-shift);
221
222 // first time writing ? remember where we are
223 if (fCatalogOffset == 0)
224 fCatalogOffset = tellp();
225
226 // remember where we came from
227 const off_t where_are_we = tellp();
228
229 // write to disk
230 seekp(fCatalogOffset);
231 write(swapped_catalog.data(), total_catalog_size);
232
233 if (where_are_we != fCatalogOffset)
234 seekp(where_are_we);
235
236 // udpate checksum
237 fCatalogSum.reset();
238 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
239
240 return good();
241 }
242
243 /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets.
244 virtual void DrsOffsetCalibrate(char* )
245 {
246
247 }
248
249 CatalogRow& AddOneCatalogRow()
250 {
251 // add one row to the catalog
252 fCatalog.emplace_back(CatalogRow());
253 fCatalog.back().resize(fTable.num_cols);
254 for (auto it=fCatalog.back().begin(); it != fCatalog.back().end(); it++)
255 *it = CatalogEntry(0,0);
256
257 fCatalogSize++;
258
259 return fCatalog.back();
260 }
261
262 /// write one row of data
263 /// Note, in a multi-threaded environment (NumThreads>0), the return code should be checked rather
264 /// than the badbit() of the stream (it might have been set by a thread before the errno has been set)
265 /// errno will then contain the correct error number of the last error which happened during writing.
266 /// @param ptr the source buffer
267 /// @param the number of bytes to write
268 /// @return the state of the file. WARNING: with multithreading, this will most likely be the state of the file before the data is actually written
269 bool WriteRow(const void* ptr, size_t cnt, bool = true)
270 {
271 if (cnt != fRealRowWidth)
272 {
273#ifdef __EXCEPTIONS
274 throw runtime_error("Wrong size of row given to WriteRow");
275#else
276 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << endl;
277 return false;
278#endif
279 }
280
281#ifdef __EXCEPTIONS
282 //check if something hapenned while the compression threads were working
283 //if so, re-throw the exception that was generated
284 if (fThreadsException != exception_ptr())
285 rethrow_exception(fThreadsException);
286#endif
287 //copy current row to pool or rows waiting for compression
288 char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
289 memcpy(target_location, ptr, fRealRowWidth);
290
291 //for now, make an extra copy of the data, for RAWSUM checksuming.
292 //Ideally this should be moved to the threads
293 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
294 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
295 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
296 char* buffer = fRawSumBuffer.data() + rawOffset;
297 auto ib = fRawSumBuffer.begin();
298 auto ie = fRawSumBuffer.rbegin();
299 *ib++ = 0;
300 *ib++ = 0;
301 *ib++ = 0;
302 *ib = 0;
303
304 *ie++ = 0;
305 *ie++ = 0;
306 *ie++ = 0;
307 *ie = 0;
308
309 memcpy(buffer, ptr, fRealRowWidth);
310
311 fRawSum.add(fRawSumBuffer, false);
312
313 fTable.num_rows++;
314
315 DrsOffsetCalibrate(target_location);
316
317 if (fTable.num_rows % fNumRowsPerTile != 0)
318 {
319 errno = fErrno;
320 return errno==0;
321 }
322
323 CompressionTarget compress_target(AddOneCatalogRow());
324 SetNextCompression(compress_target);
325
326 //no worker threads. do everything in-line
327 if (fNumQueues == 0)
328 {
329 const uint64_t size_to_write = CompressBuffer(compress_target);
330
331 const WriteTarget write_target(compress_target.target, size_to_write);
332 if (!WriteBufferToDisk(write_target))
333 throw runtime_error("Unexpected tile number mismatch in WriteBufferToDisk in the main thread.");
334
335 // The correct 'errno' is set, because it is the main thread.
336 return good();
337 }
338
339 //if all queues are empty, use queue 0
340 uint32_t min_index = 0;
341 uint32_t min_size = numeric_limits<uint32_t>::max();
342 uint32_t current_index = 0;
343
344 for (auto it=fCompressionQueues.cbegin(); it!=fCompressionQueues.cend(); it++)
345 {
346 if (it->size() < min_size)
347 {
348 min_index = current_index;
349 min_size = it->size();
350 }
351 current_index++;
352 }
353
354 if (!fCompressionQueues[min_index].emplace(compress_target))
355 throw runtime_error("The compression queues are not started. Did you close the file before writing this row?");
356
357 errno = fErrno;
358 return errno==0;
359 }
360
361 /// update the real number of rows
362 void FlushNumRows()
363 {
364 SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile);
365 SetInt("ZNAXIS2", fTable.num_rows);
366 FlushHeader();
367 }
368
369 /// Setup the environment to compress yet another tile of data
370 /// @param target the struct where to host the produced parameters
371 void SetNextCompression(CompressionTarget& target)
372 {
373 //fill up compression target
374 target.src = fSmartBuffer;
375 target.transposed_src = fMemPool.malloc();
376 target.num_rows = fTable.num_rows;
377
378 //fill up write to disk target
379 WriteTarget &write_target = target.target;
380 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
381 write_target.size = 0;
382 write_target.data = fMemPool.malloc();
383
384 //get a new buffer to host the incoming data
385 fSmartBuffer = fMemPool.malloc();
386 }
387
388 /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file.
389 void ShrinkCatalog()
390 {
391 //add empty row to get either the target number of rows, or a multiple of the allowed size
392 for (uint32_t i=0;i<fCatalogSize%fNumTiles;i++)
393 AddOneCatalogRow();
394
395 //did we write more rows than what the catalog could host ?
396 if (fCatalogSize > fNumTiles)
397 {
398 const uint32_t shrink_factor = fCatalogSize / fNumTiles; //always exact as extra rows were added just above
399
400 //shrink the catalog !
401 uint32_t entry_id = 1;
402 auto it = fCatalog.begin();
403 it++;
404 for (; it != fCatalog.end(); it++)
405 {
406 if (entry_id >= fNumTiles) break;
407
408 uint32_t target_id = entry_id*shrink_factor;
409
410 auto jt = it;
411 for (uint32_t i=0;i<target_id-entry_id;i++)
412 jt++;
413
414 *it = *jt;
415
416 entry_id++;
417 }
418
419 const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
420 //remove the too many entries
421 for (uint32_t i=0;i<num_tiles_to_remove;i++)
422 {
423 fCatalog.pop_back();
424 fCatalogSize--;
425 }
426 //update header keywords
427 fNumRowsPerTile *= shrink_factor;
428
429 SetInt("ZTILELEN", fNumRowsPerTile);
430 SetInt("ZSHRINK", shrink_factor);
431 }
432 }
433
434 /// close an open file.
435 /// @return the state of the file
436 bool close()
437 {
438 // stop compression and write threads
439 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
440 it->wait();
441
442 fWriteToDiskQueue.wait();
443
444 if (tellp() < 0)
445 return false;
446
447#ifdef __EXCEPTIONS
448 //check if something hapenned while the compression threads were working
449 //if so, re-throw the exception that was generated
450 if (fThreadsException != exception_ptr())
451 rethrow_exception(fThreadsException);
452#endif
453
454 //write the last tile of data (if any
455 if (fTable.num_rows%fNumRowsPerTile != 0)
456 {
457 CompressionTarget compress_target(AddOneCatalogRow());
458 SetNextCompression(compress_target);
459
460 //set number of threads to zero before calling compressBuffer
461 const int32_t backup_num_queues = fNumQueues;
462 fNumQueues = 0;
463
464 const uint64_t size_to_write = CompressBuffer(compress_target);
465 fNumQueues = backup_num_queues;
466
467 const WriteTarget write_target(compress_target.target, size_to_write);
468 if (!WriteBufferToDisk(write_target))
469 throw runtime_error("Tile number mismatch in WriteBufferToDisk writing the last tile.");
470 }
471
472 AlignTo2880Bytes();
473
474 int64_t heap_size = 0;
475 int64_t compressed_offset = 0;
476 for (auto it=fCatalog.begin(); it!= fCatalog.end(); it++)
477 {
478 compressed_offset += sizeof(FITS::TileHeader);
479 heap_size += sizeof(FITS::TileHeader);
480 for (uint32_t j=0; j<it->size(); j++)
481 {
482 heap_size += (*it)[j].first;
483 (*it)[j].second = compressed_offset;
484 compressed_offset += (*it)[j].first;
485 if ((*it)[j].first == 0)
486 (*it)[j].second = 0;
487 }
488 }
489
490 ShrinkCatalog();
491
492 //update header keywords
493 SetInt("ZNAXIS1", fRealRowWidth);
494 SetInt("ZNAXIS2", fTable.num_rows);
495
496 SetInt("ZHEAPPTR", fCatalogSize*fTable.num_cols*sizeof(uint64_t)*2);
497
498 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
499 const uint32_t total_catalog_width = 2*sizeof(int64_t)*fTable.num_cols;
500
501 SetInt("THEAP", total_num_tiles_written*total_catalog_width);
502 SetInt("NAXIS1", total_catalog_width);
503 SetInt("NAXIS2", total_num_tiles_written);
504 SetStr("RAWSUM", to_string(fRawSum.val()));
505
506 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
507 SetFloat("ZRATIO", compression_ratio);
508
509 //add to the heap size the size of the gap between the catalog and the actual heap
510 heap_size += (fCatalogSize - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
511
512 SetInt("PCOUNT", heap_size, "size of special data area");
513
514 //Just for updating the fCatalogSum value
515 WriteCatalog();
516
517 fDataSum += fCatalogSum;
518
519 const Checksum checksm = UpdateHeaderChecksum();
520
521 ofstream::close();
522
523 if ((checksm+fDataSum).valid())
524 return true;
525
526 ostringstream sout;
527 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
528#ifdef __EXCEPTIONS
529 throw runtime_error(sout.str());
530#else
531 gLog << ___err___ << "ERROR - " << sout.str() << endl;
532 return false;
533#endif
534 }
535
536 /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
537 bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true)
538 {
539 return AddColumn(FITS::kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
540 }
541
542 /// Overload of the simplified compressed version
543 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true)
544 {
545 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
546 return false;
547
548 const size_t size = SizeFromType(typechar);
549
550 Table::Column col;
551 col.name = name;
552 col.type = typechar;
553 col.num = cnt;
554 col.size = size;
555 col.offset = fRealRowWidth;
556
557 fRealRowWidth += size*cnt;
558
559 fRealColumns.emplace_back(col, comp);
560
561 SetStr("ZFORM"+to_string(fRealColumns.size()), to_string(cnt)+typechar, "format of "+name+" "+CommentFromType(typechar));
562 SetStr("ZCTYP"+to_string(fRealColumns.size()), "FACT", "Compression type: FACT");
563
564 return true;
565 }
566
567 /// Get and set the actual number of threads for this object
568 int32_t GetNumThreads() const { return fNumQueues;}
569 bool SetNumThreads(uint32_t num)
570 {
571 if (is_open())
572 {
573#ifdef __EXCEPTIONS
574 throw runtime_error("File must be closed before changing the number of compression threads");
575#else
576 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads" << endl;
577#endif
578 return false;
579 }
580
581 //get number of physically available threads
582#ifdef USE_BOOST_THREADS
583 unsigned int num_available_cores = boost::thread::hardware_concurrency();
584#else
585 unsigned int num_available_cores = thread::hardware_concurrency();
586#endif
587 // could not detect number of available cores from system properties...
588 if (num_available_cores == 0)
589 num_available_cores = 1;
590
591 // leave one core for the main thread and one for the writing
592 if (num > num_available_cores)
593 num = num_available_cores>2 ? num_available_cores-2 : 1;
594
595 if (fCompressionQueues.size() != uint32_t(num))
596 {
597 fCompressionQueues.resize(num, Queue<CompressionTarget>(bind(&zofits::CompressBuffer, this, placeholders::_1), false));
598 fNumQueues = num;
599 }
600
601 return true;
602 }
603
604protected:
605
606 /// Allocates the required objects.
607 void reallocateBuffers()
608 {
609 const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(FITS::BlockHeader) + sizeof(FITS::TileHeader) + 8; //+8 for checksuming;
610 fMemPool.setChunkSize(chunk_size);
611
612 fSmartBuffer = fMemPool.malloc();
613 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
614 }
615
616 /// Actually does the writing to disk (and checksuming)
617 /// @param src the buffer to write
618 /// @param sizeToWrite how many bytes should be written
619 /// @return the state of the file
620 bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite)
621 {
622 char* checkSumPointer = src+4;
623 int32_t extraBytes = 0;
624 uint32_t sizeToChecksum = sizeToWrite;
625
626 //should we extend the array to the left ?
627 if (fCheckOffset != 0)
628 {
629 sizeToChecksum += fCheckOffset;
630 checkSumPointer -= fCheckOffset;
631 memset(checkSumPointer, 0, fCheckOffset);
632 }
633
634 //should we extend the array to the right ?
635 if (sizeToChecksum%4 != 0)
636 {
637 extraBytes = 4 - (sizeToChecksum%4);
638 memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
639 sizeToChecksum += extraBytes;
640 }
641
642 //do the checksum
643 fDataSum.add(checkSumPointer, sizeToChecksum);
644
645 fCheckOffset = (4 - extraBytes)%4;
646
647 //write data to disk
648 write(src+4, sizeToWrite);
649
650 return good();
651 }
652
653 /// Compress a given buffer based on the target. This is the method executed by the threads
654 /// @param target the struct hosting the parameters of the compression
655 /// @return number of bytes of the compressed data, or always 1 when used by the Queues
656 uint32_t CompressBuffer(const CompressionTarget& target)
657 {
658 uint64_t compressed_size = 0;
659
660 //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
661 //Really do not understand what's wrong...
662 //calibrate data if required
663 //const uint32_t thisRoundNumRows = (target.num_rows%fNumRowsPerTile) ? target.num_rows%fNumRowsPerTile : fNumRowsPerTile;
664// for (uint32_t i=0;i<thisRoundNumRows;i++)
665// {
666// char* target_location = target.src.get()->get() + fRealRowWidth*i;
667// cout << "Target Location there...." << hex << static_cast<void*>(target_location) << endl;
668// DrsOffsetCalibrate(target_location);
669// }
670
671#ifdef __EXCEPTIONS
672 try
673 {
674#endif
675 //transpose the original data
676 copyTransposeTile(target.src.get(), target.transposed_src.get());
677
678 //compress the buffer
679 compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
680#ifdef __EXCEPTIONS
681 }
682 catch (...)
683 {
684 fThreadsException = current_exception();
685 if (fNumQueues == 0)
686 rethrow_exception(fThreadsException);
687 }
688#endif
689
690 if (fNumQueues == 0)
691 return compressed_size;
692
693 //post the result to the writing queue
694 //get a copy so that it becomes non-const
695 fWriteToDiskQueue.emplace(target.target, compressed_size);
696
697 // if used by the queue, always return true as the elements are not ordered
698 return 1;
699 }
700
701 /// Write one compressed tile to disk. This is the method executed by the writing thread
702 /// @param target the struct hosting the write parameters
703 bool WriteBufferToDisk(const WriteTarget& target)
704 {
705 //is this the tile we're supposed to write ?
706 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
707 return false;
708
709 fLatestWrittenTile++;
710
711#ifdef __EXCEPTIONS
712 try
713 {
714#endif
715 //could not write the data to disk
716 if (!writeCompressedDataToDisk(target.data.get(), target.size))
717 fErrno = errno;
718#ifdef __EXCEPTIONS
719 }
720 catch (...)
721 {
722 fThreadsException = current_exception();
723 if (fNumQueues == 0)
724 rethrow_exception(fThreadsException);
725 }
726#endif
727 return true;
728 }
729
730 /// Compress a given buffer based on its source and destination
731 //src cannot be const, as applySMOOTHING is done in place
732 /// @param dest the buffer hosting the compressed data
733 /// @param src the buffer hosting the transposed data
734 /// @param num_rows the number of uncompressed rows in the transposed buffer
735 /// @param the number of bytes of the compressed data
736 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows, CatalogRow& catalog_row)
737 {
738 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
739 uint32_t offset = 0;
740
741 //skip the checksum reserved area
742 dest += 4;
743
744 //skip the 'TILE' marker and tile size entry
745 uint64_t compressedOffset = sizeof(FITS::TileHeader);
746
747 //now compress each column one by one by calling compression on arrays
748 for (uint32_t i=0;i<fRealColumns.size();i++)
749 {
750 catalog_row[i].second = compressedOffset;
751
752 if (fRealColumns[i].col.num == 0)
753 continue;
754
755 FITS::Compression& head = fRealColumns[i].block_head;
756
757 //set the default byte telling if uncompressed the compressed Flag
758 const uint64_t previousOffset = compressedOffset;
759
760 //skip header data
761 compressedOffset += head.getSizeOnDisk();
762
763 for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
764 {
765 switch (head.getProc(j))
766 {
767 case FITS::kFactRaw:
768 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
769 break;
770
771 case FITS::kFactSmoothing:
772 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
773 break;
774
775 case FITS::kFactHuffman16:
776 if (head.getOrdering() == FITS::kOrderByCol)
777 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
778 else
779 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
780 break;
781 }
782 }
783
784 //check if compressed size is larger than uncompressed
785 //if so set flag and redo it uncompressed
786 if ((head.getProc(0) != FITS::kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
787 {
788 //de-smooth !
789 if (head.getProc(0) == FITS::kFactSmoothing)
790 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
791
792 FITS::Compression he;
793
794 compressedOffset = previousOffset + he.getSizeOnDisk();
795 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
796
797 he.SetBlockSize(compressedOffset - previousOffset);
798 he.Memcpy(dest+previousOffset);
799
800 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
801
802 catalog_row[i].first = compressedOffset - catalog_row[i].second;
803 continue;
804 }
805
806 head.SetBlockSize(compressedOffset - previousOffset);
807 head.Memcpy(dest + previousOffset);
808
809 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
810 catalog_row[i].first = compressedOffset - catalog_row[i].second;
811 }
812
813 const FITS::TileHeader tile_head(thisRoundNumRows, compressedOffset);
814 memcpy(dest, &tile_head, sizeof(FITS::TileHeader));
815
816 return compressedOffset;
817 }
818
819 /// Transpose a tile to a new buffer
820 /// @param src buffer hosting the regular, row-ordered data
821 /// @param dest the target buffer that will receive the transposed data
822 void copyTransposeTile(const char* src, char* dest)
823 {
824 const uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile;
825
826 //copy the tile and transpose it
827 for (uint32_t i=0;i<fRealColumns.size();i++)
828 {
829 switch (fRealColumns[i].block_head.getOrdering())
830 {
831 case FITS::kOrderByRow:
832 //regular, "semi-transposed" copy
833 for (uint32_t k=0;k<thisRoundNumRows;k++)
834 {
835 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
836 dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
837 }
838 break;
839
840 case FITS::kOrderByCol:
841 //transposed copy
842 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
843 for (uint32_t k=0;k<thisRoundNumRows;k++)
844 {
845 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
846 dest += fRealColumns[i].col.size;
847 }
848 break;
849 };
850 }
851 }
852
853 /// Specific compression functions
854 /// @param dest the target buffer
855 /// @param src the source buffer
856 /// @param size number of bytes to copy
857 /// @return number of bytes written
858 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
859 {
860 memcpy(dest, src, size);
861 return size;
862 }
863
864 /// Do huffman encoding
865 /// @param dest the buffer that will receive the compressed data
866 /// @param src the buffer hosting the transposed data
867 /// @param numRows number of rows of data in the transposed buffer
868 /// @param sizeOfElems size in bytes of one data elements
869 /// @param numRowElems number of elements on each row
870 /// @return number of bytes written
871 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
872 {
873 string huffmanOutput;
874 uint32_t previousHuffmanSize = 0;
875
876 //if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
877 if (numRows < 2)
878 return numRows*sizeOfElems*numRowElems + 1000;
879
880 if (sizeOfElems < 2 )
881 {
882#ifdef __EXCEPTIONS
883 throw runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
884#else
885 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << endl;
886 return 0;
887#endif
888 }
889
890 uint32_t huffmanOffset = 0;
891 for (uint32_t j=0;j<numRowElems;j++)
892 {
893 Huffman::Encode(huffmanOutput,
894 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
895 numRows*(sizeOfElems/2));
896 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
897 huffmanOffset += sizeof(uint32_t);
898 previousHuffmanSize = huffmanOutput.size();
899 }
900
901 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
902
903 //only copy if not larger than not-compressed size
904 if (totalSize < numRows*sizeOfElems*numRowElems)
905 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
906
907 return totalSize;
908 }
909
910 /// Applies Thomas' DRS4 smoothing
911 /// @param data where to apply it
912 /// @param numElems how many elements of type int16_t are stored in the buffer
913 /// @return number of bytes modified
914 uint32_t applySMOOTHING(char* data, uint32_t numElems)
915 {
916 int16_t* short_data = reinterpret_cast<int16_t*>(data);
917 for (int j=numElems-1;j>1;j--)
918 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
919
920 return numElems*sizeof(int16_t);
921 }
922
923 /// Apply the inverse transform of the integer smoothing
924 /// @param data where to apply it
925 /// @param numElems how many elements of type int16_t are stored in the buffer
926 /// @return number of bytes modified
927 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
928 {
929 int16_t* short_data = reinterpret_cast<int16_t*>(data);
930 for (uint32_t j=2;j<numElems;j++)
931 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
932
933 return numElems*sizeof(uint16_t);
934 }
935
936
937
938 //thread related stuff
939 MemoryManager fMemPool; ///< Actual memory manager, providing memory for the compression buffers
940 int32_t fNumQueues; ///< Current number of threads that will be used by this object
941 uint64_t fMaxUsableMem; ///< Maximum number of bytes that can be allocated by the memory manager
942 int32_t fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads)
943
944 vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads)
945 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///< Writing queue (=thread)
946
947 // catalog related stuff
948 CatalogType fCatalog; ///< Catalog for this file
949 uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists)
950 uint32_t fNumTiles; ///< Number of pre-reserved tiles
951 uint32_t fNumRowsPerTile; ///< Number of rows per tile
952 off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file
953
954 // checksum related stuff
955 Checksum fCatalogSum; ///< Checksum of the catalog
956 Checksum fRawSum; ///< Raw sum (specific to FACT)
957 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
958
959 // data layout related stuff
960 /// Regular columns augmented with compression informations
961 struct CompressedColumn
962 {
963 CompressedColumn(const Table::Column& c, const FITS::Compression& h) : col(c),
964 block_head(h)
965 {}
966 Table::Column col; ///< the regular column entry
967 FITS::Compression block_head; ///< the compression data associated with that column
968 };
969 vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file
970 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row
971 shared_ptr<char> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written
972 vector<char> fRawSumBuffer; ///< buffer used for checksuming the incoming data, before compression
973
974#ifdef __EXCEPTIONS
975 exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads
976#endif
977 int fErrno; ///< propagate errno to main thread
978
979
980};
981
982#ifndef __MARS__
983}; //namespace std
984#endif
985
986#endif
Note: See TracBrowser for help on using the repository browser.