source: trunk/Mars/mcore/zofits.h@ 17262

Last change on this file since 17262 was 17262, checked in by tbretz, 11 years ago
Removed the 'using namespace' from header
File size: 39.2 KB
Line 
1#ifndef FACT_zofits
2#define FACT_zofits
3
4/*
5 * zofits.h
6 *
7 * FACT native compressed FITS writer
8 * Author: lyard
9 */
10
11#include "ofits.h"
12#include "zfits.h"
13#include "Queue.h"
14#include "MemoryManager.h"
15
16#ifdef USE_BOOST_THREADS
17#include <boost/thread.hpp>
18#endif
19
20#ifndef __MARS__
21namespace std
22{
23#endif
24
25class zofits : public ofits
26{
27 /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin
28 template<class S>
29 struct QueueMin : std::list<S>
30 {
31 typename std::list<S>::iterator begin()
32 {
33 return min_element(std::list<S>::begin(), std::list<S>::end());
34 }
35 };
36
37
38 //catalog types
39 struct CatalogEntry
40 {
41 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) {};
42 int64_t first; ///< Size of this column in the tile
43 int64_t second; ///< offset of this column in the tile, from the start of the heap area
44 } __attribute__((__packed__));
45
46 typedef vector<CatalogEntry> CatalogRow;
47 typedef list<CatalogRow> CatalogType;
48
49
50 /// Parameters required to write a tile to disk
51 struct WriteTarget
52 {
53 bool operator < (const WriteTarget& other)
54 {
55 return tile_num < other.tile_num;
56 }
57
58 WriteTarget() { }
59 WriteTarget(const WriteTarget &t, uint32_t sz) : tile_num(t.tile_num), size(sz), data(t.data) { }
60
61 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
62 uint32_t size; ///< Size to write
63 shared_ptr<char> data; ///< Memory block to write
64 };
65
66
67 /// Parameters required to compress a tile of data
68 struct CompressionTarget
69 {
70 CompressionTarget(CatalogRow& r) : catalog_entry(r)
71 {}
72
73 CatalogRow& catalog_entry; ///< Reference to the catalog entry to deal with
74 shared_ptr<char> src; ///< Original data
75 shared_ptr<char> transposed_src; ///< Transposed data
76 WriteTarget target; ///< Compressed data
77 uint32_t num_rows; ///< Number of rows to compress
78 };
79
80public:
81 /// static setter for the default number of threads to use. -1 means all available physical cores
82 static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)<-1) n=_n; return n; }
83 static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; }
84 static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; }
85 static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; }
86
87 /// constructors
88 /// @param numTiles how many data groups should be pre-reserved ?
89 /// @param rowPerTile how many rows will be grouped together in a single tile
90 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
91 zofits(uint32_t numTiles = DefaultMaxNumTiles(),
92 uint32_t rowPerTile = DefaultNumRowsPerTile(),
93 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
94 fMemPool(0, maxUsableMem*1000),
95 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), false)
96 {
97 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
98 SetNumThreads(DefaultNumThreads());
99 }
100
101 /// @param fname the target filename
102 /// @param numTiles how many data groups should be pre-reserved ?
103 /// @param rowPerTile how many rows will be grouped together in a single tile
104 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
105 zofits(const char* fname,
106 uint32_t numTiles = DefaultMaxNumTiles(),
107 uint32_t rowPerTile = DefaultNumRowsPerTile(),
108 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(fname),
109 fMemPool(0, maxUsableMem*1000),
110 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), false)
111 {
112 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
113 SetNumThreads(DefaultNumThreads());
114 }
115
116 //initialization of member variables
117 /// @param nt number of tiles
118 /// @param rpt number of rows per tile
119 /// @param maxUsableMem max amount of RAM to be used by the compression buffers
120 void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
121 {
122 fCheckOffset = 0;
123 fNumQueues = 0;
124
125 fNumTiles = nt==0 ? 1 : nt;
126 fNumRowsPerTile = rpt;
127
128 fRealRowWidth = 0;
129 fCatalogOffset = 0;
130 fCatalogSize = 0;
131
132 fMaxUsableMem = maxUsableMem;
133#ifdef __EXCEPTIONS
134 fThreadsException = exception_ptr();
135#endif
136 fErrno = 0;
137 }
138
139 /// write the header of the binary table
140 /// @param name the name of the table to be created
141 /// @return the state of the file
142 virtual bool WriteTableHeader(const char* name="DATA")
143 {
144 reallocateBuffers();
145
146 ofits::WriteTableHeader(name);
147
148 if (fNumQueues != 0)
149 {
150 //start the compression queues
151 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
152 it->start();
153
154 //start the disk writer
155 fWriteToDiskQueue.start();
156 }
157
158 //mark that no tile has been written so far
159 fLatestWrittenTile = -1;
160
161 return good();
162 }
163
164 /// open a new file.
165 /// @param filename the name of the file
166 /// @param Whether or not the name of the extension should be added or not
167 void open(const char* filename, bool addEXTNAMEKey=true)
168 {
169 ofits::open(filename, addEXTNAMEKey);
170
171 //add compression-related header entries
172 SetBool( "ZTABLE", true, "Table is compressed");
173 SetInt( "ZNAXIS1", 0, "Width of uncompressed rows");
174 SetInt( "ZNAXIS2", 0, "Number of uncompressed rows");
175 SetInt( "ZPCOUNT", 0, "");
176 SetInt( "ZHEAPPTR", 0, "");
177 SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
178 SetInt( "THEAP", 0, "");
179 SetStr( "RAWSUM", " 0", "Checksum of raw little endian data");
180 SetFloat("ZRATIO", 0, "Compression ratio");
181 SetInt( "ZSHRINK", 1, "Catalog shrink factor");
182
183 fCatalogSize = 0;
184 fCatalog.clear();
185 fRawSum.reset();
186 }
187
188 /// Super method. does nothing as zofits does not know about DrsOffsets
189 /// @return the state of the file
190 virtual bool WriteDrsOffsetsTable()
191 {
192 return good();
193 }
194
195 /// Returns the number of bytes per uncompressed row
196 /// @return number of bytes per uncompressed row
197 uint32_t GetBytesPerRow() const
198 {
199 return fRealRowWidth;
200 }
201
202 /// Write the data catalog
203 /// @return the state of the file
204 bool WriteCatalog()
205 {
206 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
207 const uint32_t total_catalog_size = fNumTiles*one_catalog_row_size;
208
209 // swap the catalog bytes before writing
210 vector<char> swapped_catalog(total_catalog_size);
211
212 uint32_t shift = 0;
213 for (auto it=fCatalog.cbegin(); it!=fCatalog.cend(); it++)
214 {
215 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
216 shift += one_catalog_row_size;
217 }
218
219 if (fCatalogSize < fNumTiles)
220 memset(swapped_catalog.data()+shift, 0, total_catalog_size-shift);
221
222 // first time writing ? remember where we are
223 if (fCatalogOffset == 0)
224 fCatalogOffset = tellp();
225
226 // remember where we came from
227 const off_t where_are_we = tellp();
228
229 // write to disk
230 seekp(fCatalogOffset);
231 write(swapped_catalog.data(), total_catalog_size);
232
233 if (where_are_we != fCatalogOffset)
234 seekp(where_are_we);
235
236 // udpate checksum
237 fCatalogSum.reset();
238 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
239
240 return good();
241 }
242
243 /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets.
244 virtual void DrsOffsetCalibrate(char* )
245 {
246
247 }
248
249 CatalogRow& AddOneCatalogRow()
250 {
251 // add one row to the catalog
252 fCatalog.emplace_back(CatalogRow());
253 fCatalog.back().resize(fTable.num_cols);
254 for (auto it=fCatalog.back().begin(); it != fCatalog.back().end(); it++)
255 *it = CatalogEntry(0,0);
256
257 fCatalogSize++;
258
259 return fCatalog.back();
260 }
261
262 /// write one row of data
263 /// Note, in a multi-threaded environment (NumThreads>0), the return code should be checked rather
264 /// than the badbit() of the stream (it might have been set by a thread before the errno has been set)
265 /// errno will then contain the correct error number of the last error which happened during writing.
266 /// @param ptr the source buffer
267 /// @param the number of bytes to write
268 /// @return the state of the file. WARNING: with multithreading, this will most likely be the state of the file before the data is actually written
269 bool WriteRow(const void* ptr, size_t cnt, bool = true)
270 {
271 if (cnt != fRealRowWidth)
272 {
273#ifdef __EXCEPTIONS
274 throw runtime_error("Wrong size of row given to WriteRow");
275#else
276 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << endl;
277 return false;
278#endif
279 }
280
281#ifdef __EXCEPTIONS
282 //check if something hapenned while the compression threads were working
283 //if so, re-throw the exception that was generated
284 if (fThreadsException != exception_ptr())
285 rethrow_exception(fThreadsException);
286#endif
287 //copy current row to pool or rows waiting for compression
288 char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
289 memcpy(target_location, ptr, fRealRowWidth);
290
291 //for now, make an extra copy of the data, for RAWSUM checksuming.
292 //Ideally this should be moved to the threads
293 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
294 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
295 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
296 char* buffer = fRawSumBuffer.data() + rawOffset;
297 auto ib = fRawSumBuffer.begin();
298 auto ie = fRawSumBuffer.rbegin();
299 *ib++ = 0;
300 *ib++ = 0;
301 *ib++ = 0;
302 *ib = 0;
303
304 *ie++ = 0;
305 *ie++ = 0;
306 *ie++ = 0;
307 *ie = 0;
308
309 memcpy(buffer, ptr, fRealRowWidth);
310
311 fRawSum.add(fRawSumBuffer, false);
312
313 fTable.num_rows++;
314
315 DrsOffsetCalibrate(target_location);
316
317 if (fTable.num_rows % fNumRowsPerTile != 0)
318 {
319 errno = fErrno;
320 return errno==0;
321 }
322
323 CompressionTarget compress_target(AddOneCatalogRow());
324 SetNextCompression(compress_target);
325
326 //no worker threads. do everything in-line
327 if (fNumQueues == 0)
328 {
329 const uint64_t size_to_write = CompressBuffer(compress_target);
330
331 const WriteTarget write_target(compress_target.target, size_to_write);
332 if (!WriteBufferToDisk(write_target))
333 throw runtime_error("Unexpected tile number mismatch in WriteBufferToDisk in the main thread.");
334
335 // The correct 'errno' is set, because it is the main thread.
336 return good();
337 }
338
339 //if all queues are empty, use queue 0
340 uint32_t min_index = 0;
341 uint32_t min_size = numeric_limits<uint32_t>::max();
342 uint32_t current_index = 0;
343
344 for (auto it=fCompressionQueues.cbegin(); it!=fCompressionQueues.cend(); it++)
345 {
346 if (it->size() < min_size)
347 {
348 min_index = current_index;
349 min_size = it->size();
350 }
351 current_index++;
352 }
353
354 if (!fCompressionQueues[min_index].emplace(compress_target))
355 throw runtime_error("The compression queues are not started. Did you close the file before writing this row?");
356
357 errno = fErrno;
358 return errno==0;
359 }
360
361 /// update the real number of rows
362 void FlushNumRows()
363 {
364 SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile);
365 SetInt("ZNAXIS2", fTable.num_rows);
366 FlushHeader();
367 }
368
369 /// Setup the environment to compress yet another tile of data
370 /// @param target the struct where to host the produced parameters
371 void SetNextCompression(CompressionTarget& target)
372 {
373 //fill up compression target
374 target.src = fSmartBuffer;
375 target.transposed_src = fMemPool.malloc();
376 target.num_rows = fTable.num_rows;
377
378 //fill up write to disk target
379 WriteTarget &write_target = target.target;
380 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
381 write_target.size = 0;
382 write_target.data = fMemPool.malloc();
383
384 //get a new buffer to host the incoming data
385 fSmartBuffer = fMemPool.malloc();
386 }
387
388 /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file.
389 void ShrinkCatalog()
390 {
391 //add empty row to get either the target number of rows, or a multiple of the allowed size
392 for (uint32_t i=0;i<fCatalogSize%fNumTiles;i++)
393 AddOneCatalogRow();
394
395 //did we write more rows than what the catalog could host ?
396 if (fCatalogSize > fNumTiles)
397 {
398 const uint32_t shrink_factor = fCatalogSize / fNumTiles; //always exact as extra rows were added just above
399
400 //shrink the catalog !
401 uint32_t entry_id = 1;
402 auto it = fCatalog.begin();
403 it++;
404 for (; it != fCatalog.end(); it++)
405 {
406 if (entry_id >= fNumTiles) break;
407
408 uint32_t target_id = entry_id*shrink_factor;
409
410 auto jt = it;
411 for (uint32_t i=0;i<target_id-entry_id;i++)
412 jt++;
413
414 *it = *jt;
415
416 entry_id++;
417 }
418
419 const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
420 //remove the too many entries
421 for (uint32_t i=0;i<num_tiles_to_remove;i++)
422 {
423 fCatalog.pop_back();
424 fCatalogSize--;
425 }
426 //update header keywords
427 fNumRowsPerTile *= shrink_factor;
428
429 SetInt("ZTILELEN", fNumRowsPerTile);
430 SetInt("ZSHRINK", shrink_factor);
431 }
432 }
433
434 /// close an open file.
435 /// @return the state of the file
436 bool close()
437 {
438 // stop compression and write threads
439 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
440 it->wait();
441
442 fWriteToDiskQueue.wait();
443
444 if (tellp() < 0)
445 return false;
446
447#ifdef __EXCEPTIONS
448 //check if something hapenned while the compression threads were working
449 //if so, re-throw the exception that was generated
450 if (fThreadsException != exception_ptr())
451 rethrow_exception(fThreadsException);
452#endif
453
454 //write the last tile of data (if any
455 if (fTable.num_rows%fNumRowsPerTile != 0)
456 {
457 CompressionTarget compress_target(AddOneCatalogRow());
458 SetNextCompression(compress_target);
459
460 //set number of threads to zero before calling compressBuffer
461 const int32_t backup_num_queues = fNumQueues;
462 fNumQueues = 0;
463
464 const uint64_t size_to_write = CompressBuffer(compress_target);
465 fNumQueues = backup_num_queues;
466
467 const WriteTarget write_target(compress_target.target, size_to_write);
468 if (!WriteBufferToDisk(write_target))
469 throw runtime_error("Tile number mismatch in WriteBufferToDisk writing the last tile.");
470 }
471
472 AlignTo2880Bytes();
473
474 int64_t heap_size = 0;
475 int64_t compressed_offset = 0;
476 for (auto it=fCatalog.begin(); it!= fCatalog.end(); it++)
477 {
478 compressed_offset += sizeof(FITS::TileHeader);
479 heap_size += sizeof(FITS::TileHeader);
480 for (uint32_t j=0; j<it->size(); j++)
481 {
482 heap_size += (*it)[j].first;
483 (*it)[j].second = compressed_offset;
484 compressed_offset += (*it)[j].first;
485 if ((*it)[j].first == 0)
486 (*it)[j].second = 0;
487 }
488 }
489
490 ShrinkCatalog();
491
492 //update header keywords
493 SetInt("ZNAXIS1", fRealRowWidth);
494 SetInt("ZNAXIS2", fTable.num_rows);
495
496 SetInt("ZHEAPPTR", fCatalogSize*fTable.num_cols*sizeof(uint64_t)*2);
497
498 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
499 const uint32_t total_catalog_width = 2*sizeof(int64_t)*fTable.num_cols;
500
501 SetInt("THEAP", total_num_tiles_written*total_catalog_width);
502 SetInt("NAXIS1", total_catalog_width);
503 SetInt("NAXIS2", total_num_tiles_written);
504 SetStr("RAWSUM", to_string(fRawSum.val()));
505
506 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
507 SetFloat("ZRATIO", compression_ratio);
508
509 //add to the heap size the size of the gap between the catalog and the actual heap
510 heap_size += (fCatalogSize - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
511
512 SetInt("PCOUNT", heap_size, "size of special data area");
513
514 //Just for updating the fCatalogSum value
515 WriteCatalog();
516
517 fDataSum += fCatalogSum;
518
519 const Checksum checksm = UpdateHeaderChecksum();
520
521 ofstream::close();
522
523 if ((checksm+fDataSum).valid())
524 return true;
525
526 ostringstream sout;
527 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
528#ifdef __EXCEPTIONS
529 throw runtime_error(sout.str());
530#else
531 gLog << ___err___ << "ERROR - " << sout.str() << endl;
532 return false;
533#endif
534 }
535
536 /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
537 bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true)
538 {
539 return AddColumn(FITS::kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
540 }
541
542 /// Overload of the simplified compressed version
543 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true)
544 {
545 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
546 return false;
547
548 const size_t size = SizeFromType(typechar);
549
550 Table::Column col;
551 col.name = name;
552 col.type = typechar;
553 col.num = cnt;
554 col.size = size;
555 col.offset = fRealRowWidth;
556
557 fRealRowWidth += size*cnt;
558
559 fRealColumns.emplace_back(col, comp);
560
561 ostringstream str;
562 str << "format of " << name << " [" << CommentFromType(typechar);
563
564 SetStr("ZFORM"+to_string(fRealColumns.size()), to_string(cnt)+typechar, str.str());
565 SetStr("ZCTYP"+to_string(fRealColumns.size()), "FACT", "Compression type: FACT");
566
567 return true;
568 }
569
570 /// Get and set the actual number of threads for this object
571 int32_t GetNumThreads() const { return fNumQueues;}
572 bool SetNumThreads(uint32_t num)
573 {
574 if (is_open())
575 {
576#ifdef __EXCEPTIONS
577 throw runtime_error("File must be closed before changing the number of compression threads");
578#else
579 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads" << endl;
580#endif
581 return false;
582 }
583
584 //get number of physically available threads
585#ifdef USE_BOOST_THREADS
586 unsigned int num_available_cores = boost::thread::hardware_concurrency();
587#else
588 unsigned int num_available_cores = thread::hardware_concurrency();
589#endif
590 // could not detect number of available cores from system properties...
591 if (num_available_cores == 0)
592 num_available_cores = 1;
593
594 // leave one core for the main thread and one for the writing
595 if (num > num_available_cores)
596 num = num_available_cores>2 ? num_available_cores-2 : 1;
597
598 if (fCompressionQueues.size() != uint32_t(num))
599 {
600 fCompressionQueues.resize(num, Queue<CompressionTarget>(bind(&zofits::CompressBuffer, this, placeholders::_1), false));
601 fNumQueues = num;
602 }
603
604 return true;
605 }
606
607protected:
608
609 /// Allocates the required objects.
610 void reallocateBuffers()
611 {
612 const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(FITS::BlockHeader) + sizeof(FITS::TileHeader) + 8; //+8 for checksuming;
613 fMemPool.setChunkSize(chunk_size);
614
615 fSmartBuffer = fMemPool.malloc();
616 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
617 }
618
619 /// Actually does the writing to disk (and checksuming)
620 /// @param src the buffer to write
621 /// @param sizeToWrite how many bytes should be written
622 /// @return the state of the file
623 bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite)
624 {
625 char* checkSumPointer = src+4;
626 int32_t extraBytes = 0;
627 uint32_t sizeToChecksum = sizeToWrite;
628
629 //should we extend the array to the left ?
630 if (fCheckOffset != 0)
631 {
632 sizeToChecksum += fCheckOffset;
633 checkSumPointer -= fCheckOffset;
634 memset(checkSumPointer, 0, fCheckOffset);
635 }
636
637 //should we extend the array to the right ?
638 if (sizeToChecksum%4 != 0)
639 {
640 extraBytes = 4 - (sizeToChecksum%4);
641 memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
642 sizeToChecksum += extraBytes;
643 }
644
645 //do the checksum
646 fDataSum.add(checkSumPointer, sizeToChecksum);
647
648 fCheckOffset = (4 - extraBytes)%4;
649
650 //write data to disk
651 write(src+4, sizeToWrite);
652
653 return good();
654 }
655
656 /// Compress a given buffer based on the target. This is the method executed by the threads
657 /// @param target the struct hosting the parameters of the compression
658 /// @return number of bytes of the compressed data, or always 1 when used by the Queues
659 uint32_t CompressBuffer(const CompressionTarget& target)
660 {
661 uint64_t compressed_size = 0;
662
663 //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
664 //Really do not understand what's wrong...
665 //calibrate data if required
666 //const uint32_t thisRoundNumRows = (target.num_rows%fNumRowsPerTile) ? target.num_rows%fNumRowsPerTile : fNumRowsPerTile;
667// for (uint32_t i=0;i<thisRoundNumRows;i++)
668// {
669// char* target_location = target.src.get()->get() + fRealRowWidth*i;
670// cout << "Target Location there...." << hex << static_cast<void*>(target_location) << endl;
671// DrsOffsetCalibrate(target_location);
672// }
673
674#ifdef __EXCEPTIONS
675 try
676 {
677#endif
678 //transpose the original data
679 copyTransposeTile(target.src.get(), target.transposed_src.get());
680
681 //compress the buffer
682 compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
683#ifdef __EXCEPTIONS
684 }
685 catch (...)
686 {
687 fThreadsException = current_exception();
688 if (fNumQueues == 0)
689 rethrow_exception(fThreadsException);
690 }
691#endif
692
693 if (fNumQueues == 0)
694 return compressed_size;
695
696 //post the result to the writing queue
697 //get a copy so that it becomes non-const
698 fWriteToDiskQueue.emplace(target.target, compressed_size);
699
700 // if used by the queue, always return true as the elements are not ordered
701 return 1;
702 }
703
704 /// Write one compressed tile to disk. This is the method executed by the writing thread
705 /// @param target the struct hosting the write parameters
706 bool WriteBufferToDisk(const WriteTarget& target)
707 {
708 //is this the tile we're supposed to write ?
709 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
710 return false;
711
712 fLatestWrittenTile++;
713
714#ifdef __EXCEPTIONS
715 try
716 {
717#endif
718 //could not write the data to disk
719 if (!writeCompressedDataToDisk(target.data.get(), target.size))
720 fErrno = errno;
721#ifdef __EXCEPTIONS
722 }
723 catch (...)
724 {
725 fThreadsException = current_exception();
726 if (fNumQueues == 0)
727 rethrow_exception(fThreadsException);
728 }
729#endif
730 return true;
731 }
732
733 /// Compress a given buffer based on its source and destination
734 //src cannot be const, as applySMOOTHING is done in place
735 /// @param dest the buffer hosting the compressed data
736 /// @param src the buffer hosting the transposed data
737 /// @param num_rows the number of uncompressed rows in the transposed buffer
738 /// @param the number of bytes of the compressed data
739 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows, CatalogRow& catalog_row)
740 {
741 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
742 uint32_t offset = 0;
743
744 //skip the checksum reserved area
745 dest += 4;
746
747 //skip the 'TILE' marker and tile size entry
748 uint64_t compressedOffset = sizeof(FITS::TileHeader);
749
750 //now compress each column one by one by calling compression on arrays
751 for (uint32_t i=0;i<fRealColumns.size();i++)
752 {
753 catalog_row[i].second = compressedOffset;
754
755 if (fRealColumns[i].col.num == 0)
756 continue;
757
758 FITS::Compression& head = fRealColumns[i].block_head;
759
760 //set the default byte telling if uncompressed the compressed Flag
761 const uint64_t previousOffset = compressedOffset;
762
763 //skip header data
764 compressedOffset += head.getSizeOnDisk();
765
766 for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
767 {
768 switch (head.getProc(j))
769 {
770 case FITS::kFactRaw:
771 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
772 break;
773
774 case FITS::kFactSmoothing:
775 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
776 break;
777
778 case FITS::kFactHuffman16:
779 if (head.getOrdering() == FITS::kOrderByCol)
780 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
781 else
782 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
783 break;
784 }
785 }
786
787 //check if compressed size is larger than uncompressed
788 //if so set flag and redo it uncompressed
789 if ((head.getProc(0) != FITS::kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
790 {
791 //de-smooth !
792 if (head.getProc(0) == FITS::kFactSmoothing)
793 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
794
795 FITS::Compression he;
796
797 compressedOffset = previousOffset + he.getSizeOnDisk();
798 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
799
800 he.SetBlockSize(compressedOffset - previousOffset);
801 he.Memcpy(dest+previousOffset);
802
803 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
804
805 catalog_row[i].first = compressedOffset - catalog_row[i].second;
806 continue;
807 }
808
809 head.SetBlockSize(compressedOffset - previousOffset);
810 head.Memcpy(dest + previousOffset);
811
812 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
813 catalog_row[i].first = compressedOffset - catalog_row[i].second;
814 }
815
816 const FITS::TileHeader tile_head(thisRoundNumRows, compressedOffset);
817 memcpy(dest, &tile_head, sizeof(FITS::TileHeader));
818
819 return compressedOffset;
820 }
821
822 /// Transpose a tile to a new buffer
823 /// @param src buffer hosting the regular, row-ordered data
824 /// @param dest the target buffer that will receive the transposed data
825 void copyTransposeTile(const char* src, char* dest)
826 {
827 const uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile;
828
829 //copy the tile and transpose it
830 for (uint32_t i=0;i<fRealColumns.size();i++)
831 {
832 switch (fRealColumns[i].block_head.getOrdering())
833 {
834 case FITS::kOrderByRow:
835 //regular, "semi-transposed" copy
836 for (uint32_t k=0;k<thisRoundNumRows;k++)
837 {
838 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
839 dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
840 }
841 break;
842
843 case FITS::kOrderByCol:
844 //transposed copy
845 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
846 for (uint32_t k=0;k<thisRoundNumRows;k++)
847 {
848 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
849 dest += fRealColumns[i].col.size;
850 }
851 break;
852 };
853 }
854 }
855
856 /// Specific compression functions
857 /// @param dest the target buffer
858 /// @param src the source buffer
859 /// @param size number of bytes to copy
860 /// @return number of bytes written
861 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
862 {
863 memcpy(dest, src, size);
864 return size;
865 }
866
867 /// Do huffman encoding
868 /// @param dest the buffer that will receive the compressed data
869 /// @param src the buffer hosting the transposed data
870 /// @param numRows number of rows of data in the transposed buffer
871 /// @param sizeOfElems size in bytes of one data elements
872 /// @param numRowElems number of elements on each row
873 /// @return number of bytes written
874 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
875 {
876 string huffmanOutput;
877 uint32_t previousHuffmanSize = 0;
878
879 //if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
880 if (numRows < 2)
881 return numRows*sizeOfElems*numRowElems + 1000;
882
883 if (sizeOfElems < 2 )
884 {
885#ifdef __EXCEPTIONS
886 throw runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
887#else
888 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << endl;
889 return 0;
890#endif
891 }
892
893 uint32_t huffmanOffset = 0;
894 for (uint32_t j=0;j<numRowElems;j++)
895 {
896 Huffman::Encode(huffmanOutput,
897 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
898 numRows*(sizeOfElems/2));
899 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
900 huffmanOffset += sizeof(uint32_t);
901 previousHuffmanSize = huffmanOutput.size();
902 }
903
904 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
905
906 //only copy if not larger than not-compressed size
907 if (totalSize < numRows*sizeOfElems*numRowElems)
908 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
909
910 return totalSize;
911 }
912
913 /// Applies Thomas' DRS4 smoothing
914 /// @param data where to apply it
915 /// @param numElems how many elements of type int16_t are stored in the buffer
916 /// @return number of bytes modified
917 uint32_t applySMOOTHING(char* data, uint32_t numElems)
918 {
919 int16_t* short_data = reinterpret_cast<int16_t*>(data);
920 for (int j=numElems-1;j>1;j--)
921 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
922
923 return numElems*sizeof(int16_t);
924 }
925
926 /// Apply the inverse transform of the integer smoothing
927 /// @param data where to apply it
928 /// @param numElems how many elements of type int16_t are stored in the buffer
929 /// @return number of bytes modified
930 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
931 {
932 int16_t* short_data = reinterpret_cast<int16_t*>(data);
933 for (uint32_t j=2;j<numElems;j++)
934 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
935
936 return numElems*sizeof(uint16_t);
937 }
938
939
940
941 //thread related stuff
942 MemoryManager fMemPool; ///< Actual memory manager, providing memory for the compression buffers
943 int32_t fNumQueues; ///< Current number of threads that will be used by this object
944 uint64_t fMaxUsableMem; ///< Maximum number of bytes that can be allocated by the memory manager
945 int32_t fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads)
946
947 vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads)
948 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///< Writing queue (=thread)
949
950 // catalog related stuff
951 CatalogType fCatalog; ///< Catalog for this file
952 uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists)
953 uint32_t fNumTiles; ///< Number of pre-reserved tiles
954 uint32_t fNumRowsPerTile; ///< Number of rows per tile
955 off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file
956
957 // checksum related stuff
958 Checksum fCatalogSum; ///< Checksum of the catalog
959 Checksum fRawSum; ///< Raw sum (specific to FACT)
960 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
961
962 // data layout related stuff
963 /// Regular columns augmented with compression informations
964 struct CompressedColumn
965 {
966 CompressedColumn(const Table::Column& c, const FITS::Compression& h) : col(c),
967 block_head(h)
968 {}
969 Table::Column col; ///< the regular column entry
970 FITS::Compression block_head; ///< the compression data associated with that column
971 };
972 vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file
973 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row
974 shared_ptr<char> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written
975 vector<char> fRawSumBuffer; ///< buffer used for checksuming the incoming data, before compression
976
977#ifdef __EXCEPTIONS
978 exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads
979#endif
980 int fErrno; ///< propagate errno to main thread
981
982
983};
984
985#ifndef __MARS__
986}; //namespace std
987#endif
988
989#endif
Note: See TracBrowser for help on using the repository browser.