source: trunk/Mars/mcore/zofits.h@ 17221

Last change on this file since 17221 was 17221, checked in by lyard, 11 years ago
better version of zofits. New factofits. Only cleanup, defines, comments and catalog shrinkage are missing
File size: 37.6 KB
Line 
1/*
2 * zofits.h
3 *
4 * FACT native compressed FITS writer
5 * Author: lyard
6 */
7
8#include "ofits.h"
9#include "Queue.h"
10#include "MemoryManager.h"
11
12#include "FITS.h"
13
14#ifdef USE_BOOST_THREADS
15#include <boost/thread.hpp>
16#endif
17
18using namespace FITS;
19
20#ifndef __MARS__
21namespace std
22{
23#else
24using namespace std;
25#endif
26
27class zofits : public ofits
28{
29 public:
30
31 struct WriteTarget
32 {
33 bool operator < (const WriteTarget& other)
34 {
35 return tile_num < other.tile_num;
36 }
37 uint32_t tile_num;
38 uint32_t size;
39 shared_ptr<MemoryChunk> target;
40 };
41
42 struct CompressionTarget
43 {
44 bool operator < (const CompressionTarget& other)
45 {
46 return target < other.target;
47 }
48 shared_ptr<MemoryChunk> src;
49 shared_ptr<MemoryChunk> transposed_src;
50 WriteTarget target;
51 uint32_t num_rows;
52 };
53
54
55 //constructors
56 zofits(uint32_t numTiles=1000,
57 uint32_t rowPerTile=100,
58 uint64_t maxUsableMem=0) : ofits(),
59 fMemPool(0, maxUsableMem),
60 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), true, false)
61 {
62 InitMemberVariables(numTiles, rowPerTile, maxUsableMem);
63 SetNumWorkingThreads(fNumQueues);
64 }
65
66 zofits(const char* fname,
67 uint32_t numTiles=1000,
68 uint32_t rowPerTile=100,
69 uint64_t maxUsableMem=0) : ofits(fname),
70 fMemPool(0, maxUsableMem),
71 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), true, false)
72 {
73 InitMemberVariables(numTiles, rowPerTile, maxUsableMem);
74 SetNumWorkingThreads(fNumQueues);
75 }
76
77 virtual ~zofits()
78 {
79 }
80
81 //initialization of member variables
82 void InitMemberVariables(uint32_t nt=0, uint32_t rpt=0, uint64_t maxUsableMem=0)
83 {
84 if (nt == 0)
85 throw runtime_error("Cannot work with a catalog of size 0. sorry.");
86
87 fCheckOffset = 0;
88
89 fNumTiles = nt;
90 fNumRowsPerTile = rpt;
91
92 fBuffer = NULL;
93 fRealRowWidth = 0;
94 fCatalogExtraRows = 0;
95
96 fCatalogOffset = 0;
97
98 fMaxUsableMem = maxUsableMem;
99#ifdef __EXCEPTIONS
100 fThreadsException = exception_ptr();
101#endif
102 }
103
104
105 //write the header of the binary table
106 virtual bool WriteTableHeader(const char* name="DATA")
107 {
108 if (!reallocateBuffers())
109 throw ("While allocating memory: apparently there not as much free memory as advertized...");
110
111 ofits::WriteTableHeader(name);
112
113 if (fNumQueues != 0)
114 {
115 //start the compression queues
116 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
117 it->start();
118
119 fWriteToDiskQueue.start();
120 }
121
122 //mark that no tile has been written so far
123 fLatestWrittenTile = -1;
124
125 return good();
126 }
127
128 void open(const char* filename, bool addEXTNAMEKey=true)
129 {
130 ofits::open(filename, addEXTNAMEKey);
131
132 //add compression-related header entries
133 SetBool("ZTABLE", true, "Table is compressed");
134 SetInt("ZNAXIS1", 0, "Width of uncompressed rows");
135 SetInt("ZNAXIS2", 0, "Number of uncompressed rows");
136 SetInt("ZPCOUNT", 0, "");
137 SetInt("ZHEAPPTR", 0, "");
138 SetInt("ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
139 SetInt("THEAP", 0, "");
140 SetStr("RAWSUM", " 0", "Checksum of raw little endian data");
141 SetFloat("ZRATIO", 0, "Compression ratio");
142
143 fCatalogExtraRows = 0;
144 fRawSum.reset();
145 }
146
147 virtual bool WriteDrsOffsetsTable()
148 {
149 return good();
150 }
151
152 uint32_t GetBytesPerRow() const
153 {
154 return fRealRowWidth;
155 }
156
157 bool WriteCatalog()
158 {
159 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
160 const uint32_t total_catalog_size = fCatalog.size()*one_catalog_row_size;
161
162 vector<char> swapped_catalog(total_catalog_size);
163 uint32_t shift = 0;
164 for (auto it=fCatalog.begin(); it!=fCatalog.end(); it++)
165 {
166 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
167 shift += one_catalog_row_size;
168 }
169
170 if (fCatalogOffset == 0)
171 {
172 fCatalogOffset = tellp();
173 }
174
175 const off_t where_are_we = tellp();
176
177 seekp(fCatalogOffset);
178 write(swapped_catalog.data(), total_catalog_size);
179 if (where_are_we != fCatalogOffset)
180 seekp(where_are_we);
181
182 fCatalogSum.reset();
183 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
184
185 return good();
186 }
187 virtual void DrsOffsetCalibrate(char* )
188 {
189
190 }
191
192 void GrowCatalog()
193 {
194 uint32_t orig_catalog_size = fCatalog.size();
195
196 fCatalog.resize(fCatalog.size()*2);
197 for (uint32_t i=orig_catalog_size;i<fCatalog.size(); i++)
198 {
199 fCatalog[i].resize(fTable.num_cols);
200 for (auto it=(fCatalog[i].begin()); it!=fCatalog[i].end(); it++)
201 *it = CatalogEntry(0,0);
202 }
203
204 fCatalogExtraRows += orig_catalog_size;
205 fNumTiles += orig_catalog_size;
206 }
207
208 bool WriteRow(const void* ptr, size_t cnt, bool byte_swap=true)
209 {
210 if (cnt != fRealRowWidth)
211 {
212#ifdef __EXCEPTIONS
213 throw runtime_error("Wrong size of row given to WriteRow");
214#else
215 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << endl;
216 return false;
217#endif
218 }
219
220 if (fTable.num_rows >= fNumRowsPerTile*fNumTiles)
221 {
222// GrowCatalog();
223#ifdef __EXCEPTIONS
224 throw runtime_error("Maximum number of rows exceeded for this file");
225#else
226 gLog << ___err___ << "ERROR - Maximum number of rows exceeded for this file" << endl;
227 return false;
228#endif
229 }
230
231 //copy current row to pool or rows waiting for compression
232 char* target_location = fBuffer + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
233 memcpy(target_location, ptr, fRealRowWidth);
234
235 //for now, make an extra copy of the data, for RAWSUM checksuming.
236 //Ideally this should be moved to the threads, along with the drs-offset-calibration
237 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
238 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
239 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
240 char* buffer = fRawSumBuffer.data() + rawOffset;
241 auto ib = fRawSumBuffer.begin();
242 auto ie = fRawSumBuffer.rbegin();
243 *ib++ = 0;
244 *ib++ = 0;
245 *ib++ = 0;
246 *ib = 0;
247
248 *ie++ = 0;
249 *ie++ = 0;
250 *ie++ = 0;
251 *ie = 0;
252
253 memcpy(buffer, ptr, fRealRowWidth);
254
255 fRawSum.add(fRawSumBuffer, false);
256
257 DrsOffsetCalibrate(target_location);
258
259 fTable.num_rows++;
260
261 if (fTable.num_rows % fNumRowsPerTile == 0)
262 {
263 CompressionTarget compress_target;
264 SetNextCompression(compress_target);
265
266 if (fNumQueues == 0)
267 { //no worker threads. do everything in-line
268 uint64_t size_to_write = CompressBuffer(compress_target);
269
270 WriteTarget write_target;
271 write_target.size = size_to_write;
272 write_target.target = compress_target.target.target;
273 write_target.tile_num = compress_target.target.tile_num;
274
275 if (!WriteBufferToDisk(write_target))
276 throw runtime_error("Something went wrong while writing to disk");
277 }
278 else
279 {
280 //if all queues are empty, use queue 0
281 uint32_t min_index = 0;
282 uint32_t min_size = numeric_limits<uint32_t>::max();
283 uint32_t current_index = 0;
284
285 for (auto it=fCompressionQueues.begin(); it!=fCompressionQueues.end(); it++)
286 {
287 if (it->size() < min_size)
288 {
289 min_index = current_index;
290 min_size = it->size();
291 }
292 current_index++;
293 }
294
295 if (!fCompressionQueues[min_index].post(compress_target))
296 throw runtime_error("I could not post this buffer. This does not make sense...");
297 }
298 }
299
300 return good();
301 }
302
303 void FlushNumRows()
304 {
305 SetInt("NAXIS2", fTable.num_rows/fNumRowsPerTile);
306 SetInt("ZNAXIS2", fTable.num_rows);
307 FlushHeader();
308 }
309
310 void SetNextCompression(CompressionTarget& target)
311 {
312 //get space for transposed data
313 shared_ptr<MemoryChunk> transposed_data = fMemPool.malloc();
314
315 //fill up write to disk target
316 WriteTarget write_target;
317 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
318 write_target.size = 0;
319 write_target.target = fMemPool.malloc();
320
321 //fill up compression target
322 target.src = fSmartBuffer;
323 target.transposed_src = transposed_data;
324 target.target = write_target;
325 target.num_rows = fTable.num_rows;
326
327 //get a new buffer to host the incoming data
328 fSmartBuffer = fMemPool.malloc();
329 fBuffer = fSmartBuffer.get()->get();
330 }
331
332 void ShrinkCatalog()
333 {
334 //did we write more rows than what the catalog could host ?
335 if (fCatalogExtraRows != 0)
336 {
337 //how many rows can the regular catalog host ?
338 const uint32_t max_regular_rows = (fCatalog.size() - fCatalogExtraRows)*fNumRowsPerTile;
339 //what's the shrink factor to be applied ?
340 const uint32_t shrink_factor = fTable.num_rows/max_regular_rows + ((fTable.num_rows%max_regular_rows) ? 1 : 0);
341
342 //shrink the catalog !
343 for (uint32_t i=0; i<fTable.num_rows/fNumRowsPerTile; i+= shrink_factor)
344 {//add the elements one by one, so that the empty ones at the end (i.e. fTable.num_rows%shrink_factor) do not create havok
345 const uint32_t target_catalog_row = i/shrink_factor;
346 //move data from current row (i) to target row
347 for (uint32_t j=0; j<fTable.num_cols; j++)
348 {
349 fCatalog[target_catalog_row][j].second = fCatalog[i][j].second;
350 fCatalog[target_catalog_row][j].first = 0;
351 uint64_t last_size = fCatalog[i][j].first;
352 uint64_t last_offset = fCatalog[i][j].second;
353
354 for (uint32_t k=1; k<shrink_factor; k++)
355 {
356 if (fCatalog[i+k][j].second != 0)
357 {
358 fCatalog[target_catalog_row][j].first += fCatalog[i+k][j].second - last_offset;
359 }
360 else
361 {
362 fCatalog[target_catalog_row][j].first += last_size;
363 break;
364 }
365 last_size = fCatalog[i+k][j].first;
366 last_offset = fCatalog[i+k][j].second;
367 }
368 }
369 }
370
371 fCatalog.resize(fCatalog.size() - fCatalogExtraRows);
372
373 //update header keywords
374 const uint32_t new_num_rows_per_tiles = fNumRowsPerTile*shrink_factor;
375 const uint32_t new_num_tiles_written = (fTable.num_rows + new_num_rows_per_tiles-1)/new_num_rows_per_tiles;
376 SetInt("THEAP", new_num_tiles_written*2*sizeof(int64_t)*fTable.num_cols);
377 SetInt("NAXIS2", new_num_tiles_written);
378 SetInt("ZTILELEN", new_num_rows_per_tiles);
379 cout << "New num rows per tiles: " << new_num_rows_per_tiles << " shrink factor: " << shrink_factor << endl;
380 cout << "Num tiles written: " << new_num_tiles_written << endl;
381 }
382 }
383
384 bool close()
385 {
386 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
387 it->wait();
388
389 fWriteToDiskQueue.wait();
390
391 if (tellp() < 0)
392 {
393#ifdef __EXCEPTIONS
394 throw runtime_error("Something went wrong while writing to disk...");
395#else
396 return false;
397#endif
398 }
399
400#ifdef __EXCEPTIONS
401 //check if something hapenned to the compression threads
402 if (fThreadsException != exception_ptr())
403 {
404 rethrow_exception(fThreadsException);
405 }
406#endif
407
408 if (fTable.num_rows%fNumRowsPerTile != 0)
409 {
410 CompressionTarget compress_target;
411 SetNextCompression(compress_target);
412
413 //set number of threads to zero before calling compressBuffer
414 int32_t backup_num_queues = fNumQueues;
415 fNumQueues = 0;
416 uint64_t size_to_write = CompressBuffer(compress_target);
417 fNumQueues = backup_num_queues;
418
419 WriteTarget write_target;
420 write_target.size = size_to_write;
421 write_target.target = compress_target.target.target;
422 write_target.tile_num = compress_target.target.tile_num;
423
424 if (!WriteBufferToDisk(write_target))
425 throw runtime_error("Something went wrong while writing the last tile...");
426 }
427
428 AlignTo2880Bytes();
429
430 //update header keywords
431 SetInt("ZNAXIS1", fRealRowWidth);
432 SetInt("ZNAXIS2", fTable.num_rows);
433
434 uint64_t heap_offset = fCatalog.size()*fTable.num_cols*sizeof(uint64_t)*2;
435 SetInt("ZHEAPPTR", heap_offset);
436
437 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
438
439 SetInt("THEAP", total_num_tiles_written*2*sizeof(int64_t)*fTable.num_cols);
440
441 SetInt("NAXIS1", 2*sizeof(int64_t)*fTable.num_cols);
442 SetInt("NAXIS2", total_num_tiles_written);
443
444 ostringstream str;
445 str << fRawSum.val();
446 SetStr("RAWSUM", str.str());
447
448 int64_t heap_size = 0;
449 int64_t compressed_offset = 0;
450
451 for (uint32_t i=0; i<total_num_tiles_written; i++)
452 {
453 compressed_offset += sizeof(TileHeader);
454 heap_size += sizeof(TileHeader);
455 for (uint32_t j=0; j<fCatalog[i].size(); j++)
456 {
457 heap_size += fCatalog[i][j].first;
458 fCatalog[i][j].second = compressed_offset;
459 compressed_offset += fCatalog[i][j].first;
460 if (fCatalog[i][j].first == 0)
461 fCatalog[i][j].second = 0;
462 }
463 }
464
465 float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
466 SetFloat("ZRATIO", compression_ratio);
467
468 //add to the heap size the size of the gap between the catalog and the actual heap
469 heap_size += (fCatalog.size() - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
470
471 SetInt("PCOUNT", heap_size, "size of special data area");
472
473
474 //Just for updating the fCatalogSum value
475 WriteCatalog();
476
477 fDataSum += fCatalogSum;
478
479 const Checksum checksm = UpdateHeaderChecksum();
480
481 ofstream::close();
482
483 if ((checksm+fDataSum).valid())
484 return true;
485
486 ostringstream sout;
487 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
488#ifdef __EXCEPTIONS
489 throw runtime_error(sout.str());
490#else
491 gLog << ___err___ << "ERROR - " << sout.str() << endl;
492 return false;
493#endif
494 }
495
496 //Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
497 bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true)
498 {
499 BlockHeaderWriter head;
500 return AddColumn(cnt, typechar, name, unit, head, comment, addHeaderKeys);
501 }
502
503 bool AddColumn(const string& compressionScheme, uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true)
504 {
505 BlockHeaderWriter head(compressionScheme);
506 return AddColumn(cnt, typechar, name, unit, head, comment, addHeaderKeys);
507 }
508 bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, const BlockHeaderWriter& header, const string& comment="", bool addHeaderKeys=true)
509 {
510 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
511 return false;
512
513 Table::Column col;
514 size_t size = SizeFromType(typechar);
515
516 col.name = name;
517 col.type = typechar;
518 col.num = cnt;
519 col.size = size;
520 col.offset = fRealRowWidth;
521
522 fRealRowWidth += size*cnt;
523
524 fRealColumns.emplace_back(CompressedColumn(col, header));
525
526 ostringstream strKey, strVal, strCom;
527 strKey << "ZFORM" << fRealColumns.size();
528 strVal << cnt << typechar;
529 strCom << "format of " << name << " [" << CommentFromType(typechar);
530 SetStr(strKey.str(), strVal.str(), strCom.str());
531
532 strKey.str("");
533 strVal.str("");
534 strCom.str("");
535 strKey << "ZCTYP" << fRealColumns.size();
536 strVal << "FACT";
537 strCom << "Compression type FACT";
538 SetStr(strKey.str(), strVal.str(), strCom.str());
539
540 return true;
541 }
542
543 bool AddColumnShort(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="")
544 { return AddColumn(compressionScheme, cnt, 'I', name, unit, comment); }
545 bool AddColumnInt(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="")
546 { return AddColumn(compressionScheme, cnt, 'J', name, unit, comment); }
547 bool AddColumnLong(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="")
548 { return AddColumn(compressionScheme, cnt, 'K', name, unit, comment); }
549 bool AddColumnFloat(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="")
550 { return AddColumn(compressionScheme, cnt, 'E', name, unit, comment); }
551 bool AddColumnDouble(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="")
552 { return AddColumn(compressionScheme, cnt, 'D', name, unit, comment); }
553 bool AddColumnChar(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="")
554 { return AddColumn(compressionScheme, cnt, 'A', name, unit, comment); }
555 bool AddColumnByte(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="")
556 { return AddColumn(compressionScheme, cnt, 'B', name, unit, comment); }
557 bool AddColumnBool(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="")
558 { return AddColumn(compressionScheme, cnt, 'L', name, unit, comment); }
559
560 static void SetNumThreads(int32_t num) { fNumQueues = num;}
561 static int32_t GetNumThreads() { return fNumQueues;}
562 protected:
563
564 bool SetNumWorkingThreads(int32_t num)
565 {
566 if (is_open())
567 {
568#ifdef __EXCEPTIONS
569 throw runtime_error("File must be closed before changing the number of compression threads");
570#else
571 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads";
572#endif
573 return false;
574 }
575#ifdef USE_BOOST_THREADS
576 int32_t num_available_cores = boost::thread::hardware_concurrency();
577#else
578 int32_t num_available_cores = thread::hardware_concurrency();
579#endif
580
581 if (num_available_cores == 0)
582 {//could not detect number of available cores from system properties...
583 //Assuming that 5 cores are availables (4 compression, 1 write)
584 num_available_cores = 5;
585 }
586 if (num > num_available_cores)
587 {
588 ostringstream str;
589 str << "Number of threads cannot be greater than physically available (" << num_available_cores << ")";
590#ifdef __EXCEPTIONS
591 throw runtime_error(str.str());
592#else
593 gLog << ___err___ << "ERROR - " << str.str();
594#endif
595 return false;
596 }
597
598 if (num == -1)
599 num = num_available_cores-2; // 1 for writing, one for the main thread
600
601 if (fCompressionQueues.size() == (uint32_t)num)
602 return true;
603
604 //cannot be const, as resize does not want it that way
605 Queue<CompressionTarget> queue(bind(&zofits::CompressBuffer, this, placeholders::_1), false, false);
606
607 //shrink
608 if ((uint32_t)num < fCompressionQueues.size())
609 {
610 fCompressionQueues.resize(num, queue);
611 return true;
612 }
613
614 //grow
615 fCompressionQueues.resize(num, queue);
616
617 fNumQueues = num;
618
619 return true;
620 }
621
622 bool reallocateBuffers()
623 {
624 size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(BlockHeader) + sizeof(TileHeader) + 8; //+8 for checksuming;
625 fMemPool.setChunkSize(chunk_size);
626
627 fSmartBuffer = fMemPool.malloc();
628 fBuffer = fSmartBuffer.get()->get();
629
630 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
631
632 //give the catalog enough space
633 fCatalog.resize(fNumTiles);
634 for (uint32_t i=0;i<fNumTiles;i++)
635 {
636 fCatalog[i].resize(fRealColumns.size());
637 for (auto it=fCatalog[i].begin(); it!=fCatalog[i].end(); it++)
638 *it = CatalogEntry(0,0);
639 }
640 return true;
641 }
642
643 bool writeCompressedDataToDisk(char* src, uint32_t sizeToWrite)
644 {
645 char* checkSumPointer = src+4;
646 int32_t extraBytes = 0;
647 uint32_t sizeToChecksum = sizeToWrite;
648 if (fCheckOffset != 0)
649 {//should we extend the array to the left ?
650 sizeToChecksum += fCheckOffset;
651 checkSumPointer -= fCheckOffset;
652 memset(checkSumPointer, 0, fCheckOffset);
653 }
654 if (sizeToChecksum%4 != 0)
655 {//should we extend the array to the right ?
656 extraBytes = 4 - (sizeToChecksum%4);
657 memset(checkSumPointer+sizeToChecksum, 0,extraBytes);
658 sizeToChecksum += extraBytes;
659 }
660
661 //do the checksum
662 fDataSum.add(checkSumPointer, sizeToChecksum);
663
664 fCheckOffset = (4 - extraBytes)%4;
665 //write data to disk
666 write(src+4, sizeToWrite);
667
668 return good();
669 }
670
671 uint32_t CompressBuffer(const CompressionTarget& target)
672 {
673 uint64_t compressed_size = 0;
674#ifdef __EXCEPTIONS
675 try
676 {
677#endif
678 //transpose the original data
679 copyTransposeTile(target.src.get()->get(), target.transposed_src.get()->get());
680
681 //compress the buffer
682 compressed_size = compressBuffer(target.target.target.get()->get(), target.transposed_src.get()->get(), target.num_rows);
683#ifdef __EXCEPTIONS
684 }
685 catch (...)
686 {
687 fThreadsException = current_exception();
688 if (fNumQueues == 0)
689 rethrow_exception(fThreadsException);
690 }
691#endif
692
693 if (fNumQueues == 0)
694 return compressed_size;
695
696 //post the result to the writing queue
697 //get a copy so that it becomes non-const
698 WriteTarget wt;
699 wt.tile_num = target.target.tile_num;
700 wt.size = compressed_size;
701 wt.target = target.target.target;
702
703 fWriteToDiskQueue.post(wt);
704
705 return compressed_size;
706 }
707
708 bool WriteBufferToDisk(const WriteTarget& target)
709 {
710 //is this the tile we're supposed to write ?
711 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
712 return false;
713
714 fLatestWrittenTile++;
715
716 //write the buffer to disk.
717 return writeCompressedDataToDisk(target.target.get()->get(), target.size);
718 }
719
720 //src cannot be const, as applySMOOTHING is done in place
721 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows)
722 {
723 uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
724 uint32_t offset=0;
725 uint32_t currentCatalogRow = (num_rows-1)/fNumRowsPerTile;
726
727 //skip the checksum reserved area
728 dest += 4;
729
730 //skip the 'TILE' marker and tile size entry
731 uint64_t compressedOffset = sizeof(TileHeader);
732
733 //now compress each column one by one by calling compression on arrays
734 for (uint32_t i=0;i<fRealColumns.size();i++)
735 {
736 fCatalog[currentCatalogRow][i].second = compressedOffset;
737
738 if (fRealColumns[i].col.num == 0) continue;
739
740 BlockHeaderWriter& head = fRealColumns[i].block_head;
741
742 //set the default byte telling if uncompressed the compressed Flag
743 uint64_t previousOffset = compressedOffset;
744
745 //skip header data
746 compressedOffset += head.SizeOnDisk();
747
748 for (uint32_t j=0;j<head.NumProcs();j++)//sequence.size(); j++)
749 {
750 switch (head.Proc(j))
751 {
752 case kFactRaw:
753 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
754 break;
755 case kFactSmoothing:
756 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
757 break;
758 case kFactHuffman16:
759 if (head.Ordering() == kOrderByCol)
760 compressedOffset += compressHUFFMAN(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
761 else
762 compressedOffset += compressHUFFMAN(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
763 break;
764 default:
765 {
766 ostringstream str;
767 str << "Unkown compression sequence entry: " << head.Proc(j);
768#ifdef __EXCEPTIONS
769 throw runtime_error(str.str());
770#else
771 gLog << ___err___ << "ERROR - " << str.str();
772 return 0;
773#endif
774 }
775 }
776 }
777
778 //check if compressed size is larger than uncompressed
779 if ((head.Proc(0) != kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.SizeOnDisk()))// && two)
780 {//if so set flag and redo it uncompressed
781 cout << "Redoing uncompressed ! " << endl;
782 //de-smooth !
783 if (head.Proc(0) == kFactSmoothing)
784 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
785
786 BlockHeaderWriter he;
787 compressedOffset = previousOffset + he.SizeOnDisk();
788 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
789 he.SetBlockSize(compressedOffset - previousOffset);
790 he.Write(dest+previousOffset);
791 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
792 fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;
793 continue;
794 }
795
796 head.SetBlockSize(compressedOffset - previousOffset);
797 head.Write(dest + previousOffset);
798
799 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
800 fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;
801 }
802
803 TileHeader tile_head(thisRoundNumRows, compressedOffset);
804 memcpy(dest, &tile_head, sizeof(TileHeader));
805
806 return compressedOffset;
807 }
808
809 void copyTransposeTile(const char* src, char* dest)
810 {
811 uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile;
812
813 //copy the tile and transpose it
814 for (uint32_t i=0;i<fRealColumns.size();i++)
815 {
816 switch (fRealColumns[i].block_head.Ordering())
817 {
818 case kOrderByRow:
819 for (uint32_t k=0;k<thisRoundNumRows;k++)
820 {//regular, "semi-transposed" copy
821 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
822 dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
823 }
824 break;
825
826 case kOrderByCol :
827 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
828 for (uint32_t k=0;k<thisRoundNumRows;k++)
829 {//transposed copy
830 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
831 dest += fRealColumns[i].col.size;
832 }
833 break;
834 default:
835 {
836 ostringstream str;
837 str << "Unkown column ordering: " << fRealColumns[i].block_head.Ordering();
838#ifdef __EXCEPTIONS
839 throw runtime_error(str.str());
840#else
841 gLog << ___err___ << "ERROR - " << str.str();
842 return;
843#endif
844 }
845 };
846 }
847 }
848
849 /// Specific compression functions
850 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
851 {
852 memcpy(dest, src, size);
853 return size;
854 }
855
856 uint32_t compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
857 {
858 string huffmanOutput;
859 uint32_t previousHuffmanSize = 0;
860 if (numRows < 2)
861 {//if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
862 return numRows*sizeOfElems*numRowElems + 1000;
863 }
864 if (sizeOfElems < 2 )
865 {
866#ifdef __EXCEPTIONS
867 throw runtime_error("Fatal ERROR: HUFMANN can only encode short or longer types");
868#else
869 gLog << ___err___ << "ERROR - Fatal ERROR: HUFMANN can only encode short or longer types";
870 return 0;
871#endif
872 }
873 uint32_t huffmanOffset = 0;
874 for (uint32_t j=0;j<numRowElems;j++)
875 {
876 Huffman::Encode(huffmanOutput,
877 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
878 numRows*(sizeOfElems/2));
879 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
880 huffmanOffset += sizeof(uint32_t);
881 previousHuffmanSize = huffmanOutput.size();
882 }
883 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
884
885 //only copy if not larger than not-compressed size
886 if (totalSize < numRows*sizeOfElems*numRowElems)
887 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
888
889 return totalSize;
890 }
891
892 uint32_t applySMOOTHING(char* data, uint32_t numElems)//uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
893 {
894 int16_t* short_data = reinterpret_cast<int16_t*>(data);
895 for (int j=numElems-1;j>1;j--)
896 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
897
898 return numElems*sizeof(int16_t);
899 }
900 // Apply the inverse transform of the integer smoothing
901 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
902 {
903 int16_t* short_data = reinterpret_cast<int16_t*>(data);
904 //un-do the integer smoothing
905 for (uint32_t j=2;j<numElems;j++)
906 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
907
908 return numElems*sizeof(uint16_t);
909 }
910 //Compressed data stuff
911 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
912 uint32_t fNumTiles;
913 uint32_t fNumRowsPerTile;
914
915 MemoryManager fMemPool;
916
917 //thread related stuff
918 vector<Queue<CompressionTarget>> fCompressionQueues;
919 Queue<WriteTarget> fWriteToDiskQueue;
920
921 //thread related stuff
922 static int32_t fNumQueues; ///< The number of threads that will be used to compress
923
924 int32_t fLatestWrittenTile;
925#ifdef __EXCEPTIONS
926 exception_ptr fThreadsException;
927#endif
928 struct CatalogEntry
929 {
930 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) {};
931 int64_t first;
932 int64_t second;
933 } __attribute__((__packed__));
934
935 typedef vector<CatalogEntry> CatalogRow;
936 typedef vector<CatalogRow> CatalogType;
937 CatalogType fCatalog;
938 Checksum fCatalogSum;
939 Checksum fRawSum;
940 off_t fCatalogOffset;
941 uint32_t fRealRowWidth;
942 uint32_t fCatalogExtraRows;
943 vector<char> fRawSumBuffer;
944 uint64_t fMaxUsableMem;
945
946 shared_ptr<MemoryChunk> fSmartBuffer;
947 char* fBuffer;
948
949 struct CompressedColumn
950 {
951 CompressedColumn(const Table::Column& c, const BlockHeaderWriter& h) : col(c),
952 block_head(h)
953 {}
954 Table::Column col;
955 BlockHeaderWriter block_head;
956 };
957 vector<CompressedColumn> fRealColumns;
958
959};
960
961int32_t zofits::fNumQueues = 0;
962
963#ifndef __MARS__
964}; //namespace std
965#endif
966
967#ifdef crappy_example_usage
968zofits zofitsfile(123456, 100);
969zofitsfile.SetNumWorkingThreads(numThreads);
970zofitsfile.open((fileNameOut).c_str());
971std::zofits::BlockHeader zoheader(0, kOrderByRow, 2);
972vector<uint16_t> smoothmanProcessings(2);
973smoothmanProcessings[0] = kFactSmoothing;
974smoothmanProcessings[1] = kFactHuffman16;
975
976zofitsfile.AddColumn(sortedColumns[i].num,
977 sortedColumns[i].type,
978 colName,
979 "");
980
981zofitsfile.AddColumn(sortedColumns[i].num,
982 sortedColumns[i].type,
983 colName,
984 "",
985 zoheader,
986 smoothmanProcessings);
987
988zofitsfile.SetStr("ZCHKSUM", i->second.value, i->second.comment);
989zofitsfile.SetDrsCalibration(drsCalibFloat);
990zofitsfile.WriteTableHeader(tableName.c_str());
991zofitsfile.WriteRow(buffer, rowWidth);
992zofitsfile.close();
993
994#endif
Note: See TracBrowser for help on using the repository browser.