source: trunk/Mars/mcore/zofits.h@ 17243

Last change on this file since 17243 was 17243, checked in by tbretz, 11 years ago
Encapsulated the header with a guard (I don't understand why this was not yet done); reworked the static setup functions to avoid the need for a source file if included in multiple targets; simplified SetNumThreads... a good documentation is enough.
File size: 41.5 KB
Line 
1#ifndef FACT_zofits
2#define FACT_zofits
3
4/*
5 * zofits.h
6 *
7 * FACT native compressed FITS writer
8 * Author: lyard
9 */
10
11#include "ofits.h"
12#include "zfits.h"
13#include "Queue.h"
14#include "MemoryManager.h"
15
16#ifdef USE_BOOST_THREADS
17#include <boost/thread.hpp>
18#endif
19
20#ifndef __MARS__
21namespace std
22{
23#else
24using namespace std;
25#endif
26
27class zofits : public ofits
28{
29 /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin
30 template<class S>
31 struct QueueMin : std::list<S>
32 {
33 typename std::list<S>::iterator begin()
34 {
35 return min_element(std::list<S>::begin(), std::list<S>::end());
36 }
37 };
38
39 /// Parameters required to write a tile to disk
40 struct WriteTarget
41 {
42 bool operator < (const WriteTarget& other)
43 {
44 return tile_num < other.tile_num;
45 }
46
47 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
48 uint32_t size; ///< Size to write
49 shared_ptr<MemoryChunk> data; ///< Memory block to write
50 };
51
52 /// Parameters required to compress a tile of data
53 struct CompressionTarget
54 {
55 shared_ptr<MemoryChunk> src; ///< Original data
56 shared_ptr<MemoryChunk> transposed_src; ///< Transposed data
57 WriteTarget target; ///< Compressed data
58 uint32_t num_rows; ///< Number of rows to compress
59 };
60
61public:
62 /// static setter for the default number of threads to use. -1 means all available physical cores
63 static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)<-1) n=_n; return n; }
64 static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; }
65 static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; }
66 static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; }
67
68 /// constructors
69 /// @param numTiles how many data groups should be pre-reserved ?
70 /// @param rowPerTile how many rows will be grouped together in a single tile
71 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
72 zofits(uint32_t numTiles = DefaultMaxNumTiles(),
73 uint32_t rowPerTile = DefaultNumRowsPerTile(),
74 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
75 fMemPool(0, maxUsableMem*1000),
76 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), false)
77 {
78 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
79 SetNumThreads(DefaultNumThreads());
80 }
81
82 /// @param fname the target filename
83 /// @param numTiles how many data groups should be pre-reserved ?
84 /// @param rowPerTile how many rows will be grouped together in a single tile
85 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
86 zofits(const char* fname,
87 uint32_t numTiles = DefaultMaxNumTiles(),
88 uint32_t rowPerTile = DefaultNumRowsPerTile(),
89 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(fname),
90 fMemPool(0, maxUsableMem*1000),
91 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), false)
92 {
93 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
94 SetNumThreads(DefaultNumThreads());
95 }
96
97 //initialization of member variables
98 /// @param nt number of tiles
99 /// @param rpt number of rows per tile
100 /// @param maxUsableMem max amount of RAM to be used by the compression buffers
101 void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
102 {
103 if (nt == 0)
104 throw runtime_error("There must be at least 1 tile of data (0 specified). This is required by the FITS standard. Please try again with num_tile >= 1.");
105
106 fCheckOffset = 0;
107 fNumQueues = 0;
108
109 fNumTiles = nt;
110 fNumRowsPerTile = rpt;
111
112 fBuffer = NULL;
113 fRealRowWidth = 0;
114 fCatalogExtraRows = 0;
115 fCatalogOffset = 0;
116
117 fMaxUsableMem = maxUsableMem;
118#ifdef __EXCEPTIONS
119 fThreadsException = exception_ptr();
120#endif
121 }
122
123 /// write the header of the binary table
124 /// @param name the name of the table to be created
125 /// @return the state of the file
126 virtual bool WriteTableHeader(const char* name="DATA")
127 {
128 reallocateBuffers();
129
130 ofits::WriteTableHeader(name);
131
132 if (fNumQueues != 0)
133 {
134 //start the compression queues
135 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
136 it->start();
137
138 //start the disk writer
139 fWriteToDiskQueue.start();
140 }
141
142 //mark that no tile has been written so far
143 fLatestWrittenTile = -1;
144
145 return good();
146 }
147
148 /// open a new file.
149 /// @param filename the name of the file
150 /// @param Whether or not the name of the extension should be added or not
151 void open(const char* filename, bool addEXTNAMEKey=true)
152 {
153 ofits::open(filename, addEXTNAMEKey);
154
155 //add compression-related header entries
156 SetBool( "ZTABLE", true, "Table is compressed");
157 SetInt( "ZNAXIS1", 0, "Width of uncompressed rows");
158 SetInt( "ZNAXIS2", 0, "Number of uncompressed rows");
159 SetInt( "ZPCOUNT", 0, "");
160 SetInt( "ZHEAPPTR", 0, "");
161 SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
162 SetInt( "THEAP", 0, "");
163 SetStr( "RAWSUM", " 0", "Checksum of raw little endian data");
164 SetFloat("ZRATIO", 0, "Compression ratio");
165
166 fCatalogExtraRows = 0;
167 fRawSum.reset();
168 }
169
170 /// Super method. does nothing as zofits does not know about DrsOffsets
171 /// @return the state of the file
172 virtual bool WriteDrsOffsetsTable()
173 {
174 return good();
175 }
176
177 /// Returns the number of bytes per uncompressed row
178 /// @return number of bytes per uncompressed row
179 uint32_t GetBytesPerRow() const
180 {
181 return fRealRowWidth;
182 }
183
184 /// Write the data catalog
185 /// @return the state of the file
186 bool WriteCatalog()
187 {
188 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
189 const uint32_t total_catalog_size = fCatalog.size()*one_catalog_row_size;
190
191 // swap the catalog bytes before writing
192 vector<char> swapped_catalog(total_catalog_size);
193 uint32_t shift = 0;
194 for (auto it=fCatalog.begin(); it!=fCatalog.end(); it++)
195 {
196 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
197 shift += one_catalog_row_size;
198 }
199
200 // first time writing ? remember where we are
201 if (fCatalogOffset == 0)
202 fCatalogOffset = tellp();
203
204 // remember where we came from
205 const off_t where_are_we = tellp();
206
207 // write to disk
208 seekp(fCatalogOffset);
209 write(swapped_catalog.data(), total_catalog_size);
210 if (where_are_we != fCatalogOffset)
211 seekp(where_are_we);
212
213 // udpate checksum
214 fCatalogSum.reset();
215 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
216
217 return good();
218 }
219
220 /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets.
221 virtual void DrsOffsetCalibrate(char* )
222 {
223
224 }
225
226 /// Grows the catalog in case not enough rows were allocated
227 void GrowCatalog()
228 {
229 uint32_t orig_catalog_size = fCatalog.size();
230
231 fCatalog.resize(fCatalog.size()*2);
232 for (uint32_t i=orig_catalog_size;i<fCatalog.size(); i++)
233 {
234 fCatalog[i].resize(fTable.num_cols);
235 for (auto it=(fCatalog[i].begin()); it!=fCatalog[i].end(); it++)
236 *it = CatalogEntry(0,0);
237 }
238
239 fCatalogExtraRows += orig_catalog_size;
240 fNumTiles += orig_catalog_size;
241 }
242
243 /// write one row of data
244 /// @param ptr the source buffer
245 /// @param the number of bytes to write
246 /// @return the state of the file. WARNING: with multithreading, this will most likely be the state of the file before the data is actually written
247 bool WriteRow(const void* ptr, size_t cnt, bool = true)
248 {
249 if (cnt != fRealRowWidth)
250 {
251#ifdef __EXCEPTIONS
252 throw runtime_error("Wrong size of row given to WriteRow");
253#else
254 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << endl;
255 return false;
256#endif
257 }
258
259 if (fTable.num_rows >= fNumRowsPerTile*fNumTiles)
260 {
261// GrowCatalog();
262#ifdef __EXCEPTIONS
263 throw runtime_error("Maximum number of rows exceeded for this file");
264#else
265 gLog << ___err___ << "ERROR - Maximum number of rows exceeded for this file" << endl;
266 return false;
267#endif
268 }
269
270 //copy current row to pool or rows waiting for compression
271 char* target_location = fBuffer + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
272 memcpy(target_location, ptr, fRealRowWidth);
273
274 //for now, make an extra copy of the data, for RAWSUM checksuming.
275 //Ideally this should be moved to the threads
276 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
277 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
278 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
279 char* buffer = fRawSumBuffer.data() + rawOffset;
280 auto ib = fRawSumBuffer.begin();
281 auto ie = fRawSumBuffer.rbegin();
282 *ib++ = 0;
283 *ib++ = 0;
284 *ib++ = 0;
285 *ib = 0;
286
287 *ie++ = 0;
288 *ie++ = 0;
289 *ie++ = 0;
290 *ie = 0;
291
292 memcpy(buffer, ptr, fRealRowWidth);
293
294 fRawSum.add(fRawSumBuffer, false);
295
296 DrsOffsetCalibrate(target_location);
297
298 fTable.num_rows++;
299
300 if (fTable.num_rows % fNumRowsPerTile == 0)
301 {
302 CompressionTarget compress_target;
303 SetNextCompression(compress_target);
304
305 if (fNumQueues == 0)
306 { //no worker threads. do everything in-line
307 uint64_t size_to_write = CompressBuffer(compress_target);
308
309 WriteTarget write_target;
310 write_target.size = size_to_write;
311 write_target.data = compress_target.target.data;
312 write_target.tile_num = compress_target.target.tile_num;
313
314 return WriteBufferToDisk(write_target);
315 }
316 else
317 {
318 //if all queues are empty, use queue 0
319 uint32_t min_index = 0;
320 uint32_t min_size = numeric_limits<uint32_t>::max();
321 uint32_t current_index = 0;
322
323 for (auto it=fCompressionQueues.begin(); it!=fCompressionQueues.end(); it++)
324 {
325 if (it->size() < min_size)
326 {
327 min_index = current_index;
328 min_size = it->size();
329 }
330 current_index++;
331 }
332
333 if (!fCompressionQueues[min_index].post(compress_target))
334 throw runtime_error("The compression queues are not started. Did you close the file before writing this row ?");
335 }
336 }
337
338 return good();
339 }
340
341 /// update the real number of rows
342 void FlushNumRows()
343 {
344 SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile);
345 SetInt("ZNAXIS2", fTable.num_rows);
346 FlushHeader();
347 }
348
349 /// Setup the environment to compress yet another tile of data
350 /// @param target the struct where to host the produced parameters
351 void SetNextCompression(CompressionTarget& target)
352 {
353 //get space for transposed data
354 shared_ptr<MemoryChunk> transposed_data = fMemPool.malloc();
355
356 //fill up write to disk target
357 WriteTarget write_target;
358 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
359 write_target.size = 0;
360 write_target.data = fMemPool.malloc();
361
362 //fill up compression target
363 target.src = fSmartBuffer;
364 target.transposed_src = transposed_data;
365 target.target = write_target;
366 target.num_rows = fTable.num_rows;
367
368 //get a new buffer to host the incoming data
369 fSmartBuffer = fMemPool.malloc();
370 fBuffer = fSmartBuffer.get()->get();
371 }
372
373 /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file.
374 void ShrinkCatalog()
375 {
376 //did we write more rows than what the catalog could host ?
377 if (fCatalogExtraRows != 0)
378 {
379 //how many rows can the regular catalog host ?
380 const uint32_t max_regular_rows = (fCatalog.size() - fCatalogExtraRows)*fNumRowsPerTile;
381 //what's the shrink factor to be applied ?
382 const uint32_t shrink_factor = fTable.num_rows/max_regular_rows + ((fTable.num_rows%max_regular_rows) ? 1 : 0);
383
384 //shrink the catalog !
385 for (uint32_t i=0; i<fTable.num_rows/fNumRowsPerTile; i+= shrink_factor)
386 {//add the elements one by one, so that the empty ones at the end (i.e. fTable.num_rows%shrink_factor) do not create havok
387 const uint32_t target_catalog_row = i/shrink_factor;
388 //move data from current row (i) to target row
389 for (uint32_t j=0; j<fTable.num_cols; j++)
390 {
391 fCatalog[target_catalog_row][j].second = fCatalog[i][j].second;
392 fCatalog[target_catalog_row][j].first = 0;
393 uint64_t last_size = fCatalog[i][j].first;
394 uint64_t last_offset = fCatalog[i][j].second;
395
396 for (uint32_t k=1; k<shrink_factor; k++)
397 {
398 if (fCatalog[i+k][j].second != 0)
399 {
400 fCatalog[target_catalog_row][j].first += fCatalog[i+k][j].second - last_offset;
401 }
402 else
403 {
404 fCatalog[target_catalog_row][j].first += last_size;
405 break;
406 }
407 last_size = fCatalog[i+k][j].first;
408 last_offset = fCatalog[i+k][j].second;
409 }
410 }
411 }
412
413 fCatalog.resize(fCatalog.size() - fCatalogExtraRows);
414
415 //update header keywords
416 const uint32_t new_num_rows_per_tiles = fNumRowsPerTile*shrink_factor;
417 const uint32_t new_num_tiles_written = (fTable.num_rows + new_num_rows_per_tiles-1)/new_num_rows_per_tiles;
418 SetInt("THEAP", new_num_tiles_written*2*sizeof(int64_t)*fTable.num_cols);
419 SetInt("NAXIS2", new_num_tiles_written);
420 SetInt("ZTILELEN", new_num_rows_per_tiles);
421 cout << "New num rows per tiles: " << new_num_rows_per_tiles << " shrink factor: " << shrink_factor << endl;
422 cout << "Num tiles written: " << new_num_tiles_written << endl;
423 }
424 }
425
426 /// close an open file.
427 /// @return the state of the file
428 bool close()
429 {
430 // stop compression and write threads
431 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
432 it->wait();
433
434 fWriteToDiskQueue.wait();
435
436 if (tellp() < 0)
437 {
438#ifdef __EXCEPTIONS
439 throw runtime_error("Looks like the file has been closed already");
440#else
441 return false;
442#endif
443 }
444
445#ifdef __EXCEPTIONS
446 //check if something hapenned while the compression threads were working
447 if (fThreadsException != exception_ptr())
448 {
449 //if so, re-throw the exception that was generated
450 rethrow_exception(fThreadsException);
451 }
452#endif
453
454 //write the last tile of data (if any
455 if (fTable.num_rows%fNumRowsPerTile != 0)
456 {
457 CompressionTarget compress_target;
458 SetNextCompression(compress_target);
459
460 //set number of threads to zero before calling compressBuffer
461 int32_t backup_num_queues = fNumQueues;
462 fNumQueues = 0;
463 uint64_t size_to_write = CompressBuffer(compress_target);
464 fNumQueues = backup_num_queues;
465
466 WriteTarget write_target;
467 write_target.size = size_to_write;
468 write_target.data = compress_target.target.data;
469 write_target.tile_num = compress_target.target.tile_num;
470
471 if (!WriteBufferToDisk(write_target))
472 throw runtime_error("Something went wrong while writing the last tile...");
473 }
474
475 AlignTo2880Bytes();
476
477 //update header keywords
478 SetInt("ZNAXIS1", fRealRowWidth);
479 SetInt("ZNAXIS2", fTable.num_rows);
480
481 SetInt("ZHEAPPTR", fCatalog.size()*fTable.num_cols*sizeof(uint64_t)*2);
482
483 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
484 const uint32_t total_catalog_width = 2*sizeof(int64_t)*fTable.num_cols;
485
486 SetInt("THEAP", total_num_tiles_written*total_catalog_width);
487 SetInt("NAXIS1", total_catalog_width);
488 SetInt("NAXIS2", total_num_tiles_written);
489
490 ostringstream str;
491 str << fRawSum.val();
492 SetStr("RAWSUM", str.str());
493
494 int64_t heap_size = 0;
495 int64_t compressed_offset = 0;
496
497 for (uint32_t i=0; i<total_num_tiles_written; i++)
498 {
499 compressed_offset += sizeof(TileHeader);
500 heap_size += sizeof(TileHeader);
501 for (uint32_t j=0; j<fCatalog[i].size(); j++)
502 {
503 heap_size += fCatalog[i][j].first;
504 fCatalog[i][j].second = compressed_offset;
505 compressed_offset += fCatalog[i][j].first;
506 if (fCatalog[i][j].first == 0)
507 fCatalog[i][j].second = 0;
508 }
509 }
510
511 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
512 SetFloat("ZRATIO", compression_ratio);
513
514 //add to the heap size the size of the gap between the catalog and the actual heap
515 heap_size += (fCatalog.size() - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
516
517 SetInt("PCOUNT", heap_size, "size of special data area");
518
519 //Just for updating the fCatalogSum value
520 WriteCatalog();
521
522 fDataSum += fCatalogSum;
523
524 const Checksum checksm = UpdateHeaderChecksum();
525
526 ofstream::close();
527
528 if ((checksm+fDataSum).valid())
529 return true;
530
531 ostringstream sout;
532 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
533#ifdef __EXCEPTIONS
534 throw runtime_error(sout.str());
535#else
536 gLog << ___err___ << "ERROR - " << sout.str() << endl;
537 return false;
538#endif
539 }
540
541 /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
542 bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true)
543 {
544 return AddColumn(kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
545 }
546
547 /// Overload of the simplified compressed version
548 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true)
549 {
550 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
551 return false;
552
553 Table::Column col;
554 size_t size = SizeFromType(typechar);
555
556 col.name = name;
557 col.type = typechar;
558 col.num = cnt;
559 col.size = size;
560 col.offset = fRealRowWidth;
561
562 fRealRowWidth += size*cnt;
563
564 fRealColumns.emplace_back(CompressedColumn(col, comp));
565
566 ostringstream strKey, strVal, strCom;
567 strKey << "ZFORM" << fRealColumns.size();
568 strVal << cnt << typechar;
569 strCom << "format of " << name << " [" << CommentFromType(typechar);
570 SetStr(strKey.str(), strVal.str(), strCom.str());
571
572 strKey.str("");
573 strVal.str("");
574 strCom.str("");
575 strKey << "ZCTYP" << fRealColumns.size();
576 strVal << "FACT";
577 strCom << "Compression type FACT";
578 SetStr(strKey.str(), strVal.str(), strCom.str());
579
580 return true;
581 }
582
583 /// Get and set the actual number of threads for this object
584 int32_t GetNumThreads() const { return fNumQueues;}
585 bool SetNumThreads(uint32_t num)
586 {
587 if (is_open())
588 {
589#ifdef __EXCEPTIONS
590 throw runtime_error("File must be closed before changing the number of compression threads");
591#else
592 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads" << endl;
593#endif
594 return false;
595 }
596
597 //get number of physically available threads
598#ifdef USE_BOOST_THREADS
599 unsigned int num_available_cores = boost::thread::hardware_concurrency();
600#else
601 unsigned int num_available_cores = thread::hardware_concurrency();
602#endif
603 // could not detect number of available cores from system properties...
604 if (num_available_cores == 0)
605 num_available_cores = 1;
606
607 // leave one core for the main thread and one for the writing
608 if (num > num_available_cores)
609 num = num_available_cores>2 ? num_available_cores-2 : 1;
610
611 if (fCompressionQueues.size() != uint32_t(num))
612 {
613 fCompressionQueues.resize(num, Queue<CompressionTarget>(bind(&zofits::CompressBuffer, this, placeholders::_1), false));
614 fNumQueues = num;
615 }
616
617 return true;
618 }
619
620protected:
621
622 /// Allocates the required objects.
623 void reallocateBuffers()
624 {
625 const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(BlockHeader) + sizeof(TileHeader) + 8; //+8 for checksuming;
626 fMemPool.setChunkSize(chunk_size);
627
628 fSmartBuffer = fMemPool.malloc();
629 fBuffer = fSmartBuffer.get()->get();
630
631 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
632
633 //give the catalog enough space
634 fCatalog.resize(fNumTiles);
635 for (uint32_t i=0;i<fNumTiles;i++)
636 {
637 fCatalog[i].resize(fRealColumns.size());
638 for (auto it=fCatalog[i].begin(); it!=fCatalog[i].end(); it++)
639 *it = CatalogEntry(0,0);
640 }
641 }
642
643 /// Actually does the writing to disk (and checksuming)
644 /// @param src the buffer to write
645 /// @param sizeToWrite how many bytes should be written
646 /// @return the state of the file
647 bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite)
648 {
649 char* checkSumPointer = src+4;
650 int32_t extraBytes = 0;
651 uint32_t sizeToChecksum = sizeToWrite;
652 if (fCheckOffset != 0)
653 {//should we extend the array to the left ?
654 sizeToChecksum += fCheckOffset;
655 checkSumPointer -= fCheckOffset;
656 memset(checkSumPointer, 0, fCheckOffset);
657 }
658 if (sizeToChecksum%4 != 0)
659 {//should we extend the array to the right ?
660 extraBytes = 4 - (sizeToChecksum%4);
661 memset(checkSumPointer+sizeToChecksum, 0,extraBytes);
662 sizeToChecksum += extraBytes;
663 }
664
665 //do the checksum
666 fDataSum.add(checkSumPointer, sizeToChecksum);
667
668 fCheckOffset = (4 - extraBytes)%4;
669 //write data to disk
670 write(src+4, sizeToWrite);
671
672 return good();
673 }
674
675 /// Compress a given buffer based on the target. This is the method executed by the threads
676 /// @param target the struct hosting the parameters of the compression
677 /// @return number of bytes of the compressed data, or always 1 when used by the Queues
678 uint32_t CompressBuffer(const CompressionTarget& target)
679 {
680 uint64_t compressed_size = 0;
681#ifdef __EXCEPTIONS
682 try
683 {
684#endif
685 //transpose the original data
686 copyTransposeTile(target.src.get()->get(), target.transposed_src.get()->get());
687
688 //compress the buffer
689 compressed_size = compressBuffer(target.target.data.get()->get(), target.transposed_src.get()->get(), target.num_rows);
690#ifdef __EXCEPTIONS
691 }
692 catch (...)
693 {
694 fThreadsException = current_exception();
695 if (fNumQueues == 0)
696 rethrow_exception(fThreadsException);
697 }
698#endif
699
700 if (fNumQueues == 0)
701 return compressed_size;
702
703 //post the result to the writing queue
704 //get a copy so that it becomes non-const
705 WriteTarget wt;
706 wt.tile_num = target.target.tile_num;
707 wt.size = compressed_size;
708 wt.data = target.target.data;
709
710 fWriteToDiskQueue.post(wt);
711
712 // if used by the queue, always return true as the elements are not ordered
713 return 1;
714 }
715
716 /// Write one compressed tile to disk. This is the method executed by the writing thread
717 /// @param target the struct hosting the write parameters
718 bool WriteBufferToDisk(const WriteTarget& target)
719 {
720 //is this the tile we're supposed to write ?
721 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
722 return false;
723
724 fLatestWrittenTile++;
725
726#ifdef __EXCEPTIONS
727 try
728 {
729#endif
730 if (!writeCompressedDataToDisk(target.data.get()->get(), target.size))
731 {//could not write the data to disk
732 ostringstream str;
733 str << "An error occured while writing to disk: ";
734 if (eof())
735 str << "End-Of-File";
736 if (failbit)
737 str << "Logical error on i/o operation";
738 if (badbit)
739 str << "Writing error on i/o operation";
740#ifdef __EXCEPTIONS
741 throw runtime_error(str.str());
742#else
743 gLog << ___err___ << "ERROR - " << str.str() << endl;
744#endif
745 }
746#ifdef __EXCEPTIONS
747 }
748 catch(...)
749 {
750 fThreadsException = current_exception();
751 if (fNumQueues == 0)
752 rethrow_exception(fThreadsException);
753 }
754#endif
755 return true;
756 }
757
758 /// Compress a given buffer based on its source and destination
759 //src cannot be const, as applySMOOTHING is done in place
760 /// @param dest the buffer hosting the compressed data
761 /// @param src the buffer hosting the transposed data
762 /// @param num_rows the number of uncompressed rows in the transposed buffer
763 /// @param the number of bytes of the compressed data
764 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows)
765 {
766 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
767 const uint32_t currentCatalogRow = (num_rows-1)/fNumRowsPerTile;
768 uint32_t offset = 0;
769
770 //skip the checksum reserved area
771 dest += 4;
772
773 //skip the 'TILE' marker and tile size entry
774 uint64_t compressedOffset = sizeof(TileHeader);
775
776 //now compress each column one by one by calling compression on arrays
777 for (uint32_t i=0;i<fRealColumns.size();i++)
778 {
779 fCatalog[currentCatalogRow][i].second = compressedOffset;
780
781 if (fRealColumns[i].col.num == 0) continue;
782
783 Compression& head = fRealColumns[i].block_head;
784
785 //set the default byte telling if uncompressed the compressed Flag
786 const uint64_t previousOffset = compressedOffset;
787
788 //skip header data
789 compressedOffset += head.getSizeOnDisk();
790
791 for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
792 {
793 switch (head.getProc(j))
794 {
795 case kFactRaw:
796 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
797 break;
798 case kFactSmoothing:
799 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
800 break;
801 case kFactHuffman16:
802 if (head.getOrdering() == kOrderByCol)
803 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
804 else
805 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
806 break;
807 }
808 }
809
810 //check if compressed size is larger than uncompressed
811 if ((head.getProc(0) != kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
812 {//if so set flag and redo it uncompressed
813 // cout << "Redoing uncompressed ! " << endl;
814 //de-smooth !
815 if (head.getProc(0) == kFactSmoothing)
816 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
817
818 Compression he;
819
820 compressedOffset = previousOffset + he.getSizeOnDisk();
821 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
822
823 he.SetBlockSize(compressedOffset - previousOffset);
824 he.Memcpy(dest+previousOffset);
825
826 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
827
828 fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;
829 continue;
830 }
831
832 head.SetBlockSize(compressedOffset - previousOffset);
833 head.Memcpy(dest + previousOffset);
834
835 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
836 fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;
837 }
838
839 TileHeader tile_head(thisRoundNumRows, compressedOffset);
840 memcpy(dest, &tile_head, sizeof(TileHeader));
841
842 return compressedOffset;
843 }
844
845 /// Transpose a tile to a new buffer
846 /// @param src buffer hosting the regular, row-ordered data
847 /// @param dest the target buffer that will receive the transposed data
848 void copyTransposeTile(const char* src, char* dest)
849 {
850 const uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile;
851
852 //copy the tile and transpose it
853 for (uint32_t i=0;i<fRealColumns.size();i++)
854 {
855 switch (fRealColumns[i].block_head.getOrdering())
856 {
857 case kOrderByRow:
858 for (uint32_t k=0;k<thisRoundNumRows;k++)
859 {//regular, "semi-transposed" copy
860 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
861 dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
862 }
863 break;
864
865 case kOrderByCol :
866 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
867 for (uint32_t k=0;k<thisRoundNumRows;k++)
868 {//transposed copy
869 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
870 dest += fRealColumns[i].col.size;
871 }
872 break;
873 };
874 }
875 }
876
877 /// Specific compression functions
878 /// @param dest the target buffer
879 /// @param src the source buffer
880 /// @param size number of bytes to copy
881 /// @return number of bytes written
882 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
883 {
884 memcpy(dest, src, size);
885 return size;
886 }
887
888 /// Do huffman encoding
889 /// @param dest the buffer that will receive the compressed data
890 /// @param src the buffer hosting the transposed data
891 /// @param numRows number of rows of data in the transposed buffer
892 /// @param sizeOfElems size in bytes of one data elements
893 /// @param numRowElems number of elements on each row
894 /// @return number of bytes written
895 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
896 {
897 string huffmanOutput;
898 uint32_t previousHuffmanSize = 0;
899 if (numRows < 2)
900 {//if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
901 return numRows*sizeOfElems*numRowElems + 1000;
902 }
903 if (sizeOfElems < 2 )
904 {
905#ifdef __EXCEPTIONS
906 throw runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
907#else
908 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << endl;
909 return 0;
910#endif
911 }
912 uint32_t huffmanOffset = 0;
913 for (uint32_t j=0;j<numRowElems;j++)
914 {
915 Huffman::Encode(huffmanOutput,
916 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
917 numRows*(sizeOfElems/2));
918 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
919 huffmanOffset += sizeof(uint32_t);
920 previousHuffmanSize = huffmanOutput.size();
921 }
922 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
923
924 //only copy if not larger than not-compressed size
925 if (totalSize < numRows*sizeOfElems*numRowElems)
926 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
927
928 return totalSize;
929 }
930
931 /// Applies Thomas' DRS4 smoothing
932 /// @param data where to apply it
933 /// @param numElems how many elements of type int16_t are stored in the buffer
934 /// @return number of bytes modified
935 uint32_t applySMOOTHING(char* data, uint32_t numElems)
936 {
937 int16_t* short_data = reinterpret_cast<int16_t*>(data);
938 for (int j=numElems-1;j>1;j--)
939 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
940
941 return numElems*sizeof(int16_t);
942 }
943
944 /// Apply the inverse transform of the integer smoothing
945 /// @param data where to apply it
946 /// @param numElems how many elements of type int16_t are stored in the buffer
947 /// @return number of bytes modified
948 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
949 {
950 int16_t* short_data = reinterpret_cast<int16_t*>(data);
951 //un-do the integer smoothing
952 for (uint32_t j=2;j<numElems;j++)
953 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
954
955 return numElems*sizeof(uint16_t);
956 }
957
958
959
960 //thread related stuff
961 MemoryManager fMemPool; ///< Actual memory manager, providing memory for the compression buffers
962 int32_t fNumQueues; ///< Current number of threads that will be used by this object
963 uint64_t fMaxUsableMem; ///< Maximum number of bytes that can be allocated by the memory manager
964 int32_t fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads)
965
966 vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads)
967 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///< Writing queue (=thread)
968
969 // catalog related stuff
970 struct CatalogEntry
971 {
972 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) {};
973 int64_t first; ///< Size of this column in the tile
974 int64_t second; ///< offset of this column in the tile, from the start of the heap area
975 } __attribute__((__packed__));
976
977 typedef vector<CatalogEntry> CatalogRow;
978 typedef vector<CatalogRow> CatalogType;
979 CatalogType fCatalog; ///< Catalog for this file
980// uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists)
981 uint32_t fNumTiles; ///< Number of pre-reserved tiles
982 uint32_t fNumRowsPerTile; ///< Number of rows per tile
983 off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file
984 uint32_t fCatalogExtraRows; ///< Number of extra rows written on top of the initial capacity of the file
985
986 // checksum related stuff
987 Checksum fCatalogSum; ///< Checksum of the catalog
988 Checksum fRawSum; ///< Raw sum (specific to FACT)
989 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
990
991 // data layout related stuff
992 /// Regular columns augmented with compression informations
993 struct CompressedColumn
994 {
995 CompressedColumn(const Table::Column& c, const Compression& h) : col(c),
996 block_head(h)
997 {}
998 Table::Column col; ///< the regular column entry
999 Compression block_head; ///< the compression data associated with that column
1000 };
1001 vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file
1002 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row
1003 shared_ptr<MemoryChunk> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written
1004 char* fBuffer; ///< regular version of fSmartBuffer
1005 vector<char> fRawSumBuffer;///< buffer used for checksuming the incoming data, before compression
1006
1007#ifdef __EXCEPTIONS
1008 exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads
1009#endif
1010
1011};
1012
1013#ifndef __MARS__
1014}; //namespace std
1015#endif
1016
1017#endif
Note: See TracBrowser for help on using the repository browser.