source: trunk/FACT++/src/fitsDecompressor.cc@ 17020

Last change on this file since 17020 was 17007, checked in by lyard, 11 years ago
added decompressor program
File size: 78.3 KB
Line 
1/*
2 * fitsCompressor.cc
3 *
4 * Created on: May 7, 2013
5 * Author: lyard
6 */
7
8
9#include "Configuration.h"
10#include "../externals/factfits.h"
11#include "../externals/ofits.h"
12#include "../externals/checksum.h"
13
14#include <map>
15#include <fstream>
16#include <sstream>
17#include <iostream>
18
19
20using namespace std;
21
22typedef struct TileHeader
23{
24 char id[4];
25 uint32_t numRows;
26 uint64_t size;
27 TileHeader(uint32_t nRows=0,
28 uint64_t s=0) : id({'T', 'I', 'L', 'E'}),
29 numRows(nRows),
30 size(s)
31 { };
32} __attribute__((__packed__)) TileHeader;
33
34typedef struct BlockHeader
35{
36 uint64_t size;
37 char ordering;
38 unsigned char numProcs;
39 BlockHeader(uint64_t s=0,
40 char o=zfits::kOrderByRow,
41 unsigned char n=1) : size(s),
42 ordering(o),
43 numProcs(n)
44 {}
45} __attribute__((__packed__)) BlockHeader;
46
47class CompressedFitsFile
48{
49public:
50 class HeaderEntry
51 {
52 public:
53 /**
54 * Default constructor
55 */
56 HeaderEntry(): _key(""),
57 _value(""),
58 _comment(""),
59 _fitsString("")
60 {
61 }
62
63 /**
64 * Regular constructor.
65 * @param key the name of the keyword entry
66 * @param value its value
67 * @param comment an optionnal comment to be placed after the value
68 */
69 template<typename T>
70 HeaderEntry(const string& k,
71 const T& val,
72 const string& comm) : _key(k),
73 _value(""),
74 _comment(comm),
75 _fitsString("")
76 {
77 setValue(val);
78 }
79
80 /**
81 * From fits.h
82 */
83 string Trim(const string &str, char c=' ')
84 {
85 // Trim Both leading and trailing spaces
86 const size_t pstart = str.find_first_not_of(c); // Find the first character position after excluding leading blank spaces
87 const size_t pend = str.find_last_not_of(c); // Find the first character position from reverse af
88
89 // if all spaces or empty return an empty string
90 if (string::npos==pstart || string::npos==pend)
91 return string();
92
93 return str.substr(pstart, pend-pstart+1);
94 }
95 /**
96 * Constructor from the original fits entry
97 */
98 HeaderEntry(const string& line)
99 {
100 _fitsString = line.data();
101
102 //parse the line
103 _key = Trim(line.substr(0,8));
104 //COMMENT and/or HISTORY values
105 if (line.substr(8,2)!= "= ")
106 {
107 _value = "";
108 _comment = Trim(line.substr(10));
109 return;
110 }
111 string next=line.substr(10);
112 const size_t slash = next.find_first_of('/');
113 _value = Trim(Trim(Trim(next.substr(0, slash)), '\''));
114 _comment = Trim(next.substr(slash+1));
115 }
116 /**
117 * Alternative constroctor from the fits entry
118 */
119 HeaderEntry(const vector<char>& lineVec)
120 {
121 HeaderEntry(string(lineVec.data()));
122 }
123 /**
124 * Destructor
125 */
126 virtual ~HeaderEntry(){}
127
128 const string& value() const {return _value;}
129 const string& key() const {return _key;}
130 const string& comment() const {return _comment;}
131 const string& fitsString() const {return _fitsString;}
132
133 /**
134 * Set a keyword value.
135 * @param value The value to be set
136 * @param update whether the value already exist or not. To be modified soon.
137 */
138 template<typename T>
139 void setValue(const T& val)
140 {
141 ostringstream str;
142 str << val;
143 _value = str.str();
144 buildFitsString();
145 };
146 /**
147 * Set the comment for a given entry
148 */
149 void setComment(const string& comm)
150 {
151 _comment = comm;
152 buildFitsString();
153 }
154
155 private:
156 /**
157 * Construct the FITS header string from the key, value and comment
158 */
159 void buildFitsString()
160 {
161 ostringstream str;
162 unsigned int totSize = 0;
163
164 // Tuncate the key if required
165 if (_key.length() > 8)
166 {
167 str << _key.substr(0, 8);
168 totSize += 8;
169 }
170 else
171 {
172 str << _key;
173 totSize += _key.length();
174 }
175
176 // Append space if key is less than 8 chars long
177 for (int i=totSize; i<8;i++)
178 {
179 str << " ";
180 totSize++;
181 }
182
183 // Add separator
184 str << "= ";
185 totSize += 2;
186
187 // Format value
188 if (_value.length() < 20)
189 for (;totSize<30-_value.length();totSize++)
190 str << " ";
191
192 if (_value.length() > 70)
193 {
194 str << _value.substr(0,70);
195 totSize += 70;
196 }
197 else
198 {
199 str << _value;
200 totSize += _value.size();
201 }
202
203 // If there is space remaining, add comment area
204 if (totSize < 77)
205 {
206 str << " / ";
207 totSize += 3;
208 if (totSize < 80)
209 {
210 unsigned int commentSize = 80 - totSize;
211 if (_comment.length() > commentSize)
212 {
213 str << _comment.substr(0,commentSize);
214 totSize += commentSize;
215 }
216 else
217 {
218 str << _comment;
219 totSize += _comment.length();
220 }
221 }
222 }
223
224 // If there is yet some free space, fill up the entry with spaces
225 for (int i=totSize; i<80;i++)
226 str << " ";
227
228 _fitsString = str.str();
229
230 // Check for correct completion
231 if (_fitsString.length() != 80)
232 cout << "Error |" << _fitsString << "| is not of length 80" << endl;
233 }
234
235 string _key; ///< the key (name) of the header entry
236 string _value; ///< the value of the header entry
237 string _comment; ///< the comment associated to the header entry
238 string _fitsString; ///< the string that will be written to the fits file
239 };
240
241 /**
242 * Supported compressions
243 */
244 typedef enum
245 {
246 UNCOMPRESSED,
247 SMOOTHMAN
248 } FitsCompression;
249
250 /**
251 * Columns class
252 */
253 class ColumnEntry
254 {
255 public:
256 /**
257 * Default constructor
258 */
259 ColumnEntry();
260
261 /**
262 * Default destructor
263 */
264 virtual ~ColumnEntry(){}
265
266 /**
267 * Constructor from values
268 * @param n the column name
269 * @param t the column type
270 * @param numof the number of entries in the column
271 * @param comp the compression type for this column
272 */
273 ColumnEntry(const string& n,
274 char t,
275 int numOf,
276 BlockHeader& head,
277 vector<uint16_t>& seq) : _name(n),
278 _num(numOf),
279 _typeSize(0),
280 _offset(0),
281 _type(t),
282 _description(""),
283 _header(head),
284 _compSequence(seq)
285 {
286 switch (t)
287 {
288 case 'L':
289 case 'A':
290 case 'B': _typeSize = 1; break;
291 case 'I': _typeSize = 2; break;
292 case 'J':
293 case 'E': _typeSize = 4; break;
294 case 'K':
295 case 'D': _typeSize = 8; break;
296 default:
297 cout << "Error: typename " << t << " missing in the current implementation" << endl;
298 };
299
300 ostringstream str;
301 str << "data format of field: ";
302
303 switch (t)
304 {
305 case 'L': str << "1-byte BOOL"; break;
306 case 'A': str << "1-byte CHAR"; break;
307 case 'B': str << "BYTE"; break;
308 case 'I': str << "2-byte INTEGER"; break;
309 case 'J': str << "4-byte INTEGER"; break;
310 case 'K': str << "8-byte INTEGER"; break;
311 case 'E': str << "4-byte FLOAT"; break;
312 case 'D': str << "8-byte FLOAT"; break;
313 }
314
315 _description = str.str();
316 }
317
318 const string& name() const { return _name;};
319 int width() const { return _num*_typeSize;};
320 int offset() const { return _offset; };
321 int numElems() const { return _num; };
322 int sizeOfElems() const { return _typeSize;};
323 void setOffset(int off) { _offset = off;};
324 char type() const { return _type;};
325 string getDescription() const { return _description;}
326 BlockHeader& getBlockHeader() { return _header;}
327 const vector<uint16_t>& getCompressionSequence() const { return _compSequence;}
328 const char& getColumnOrdering() const { return _header.ordering;}
329
330
331 string getCompressionString() const
332 {
333 return "FACT";
334 /*
335 ostringstream str;
336 for (uint32_t i=0;i<_compSequence.size();i++)
337 switch (_compSequence[i])
338 {
339 case FACT_RAW: if (str.str().size() == 0) str << "RAW"; break;
340 case FACT_SMOOTHING: str << "SMOOTHING "; break;
341 case FACT_HUFFMAN16: str << "HUFFMAN16 "; break;
342 };
343 return str.str();*/
344 }
345
346 private:
347
348 string _name; ///< name of the column
349 int _num; ///< number of elements contained in one row of this column
350 int _typeSize; ///< the number of bytes taken by one element
351 int _offset; ///< the offset of the column, in bytes, from the beginning of one row
352 char _type; ///< the type of the column, as specified by the fits documentation
353 string _description; ///< a description for the column. It will be placed in the header
354 BlockHeader _header;
355 vector<uint16_t> _compSequence;
356 };
357
358 public:
359 ///@brief default constructor. Assigns a default number of rows and tiles
360 CompressedFitsFile(uint32_t numTiles=100, uint32_t numRowsPerTile=100);
361
362 ///@brief default destructor
363 virtual ~CompressedFitsFile();
364
365 ///@brief get the header of the file
366 vector<HeaderEntry>& getHeaderEntries() { return _header;}
367
368 protected:
369 ///@brief protected function to allocate the intermediate buffers
370 bool reallocateBuffers();
371
372 //FITS related stuff
373 vector<HeaderEntry> _header; ///< Header keys
374 vector<ColumnEntry> _columns; ///< Columns in the file
375 uint32_t _numTiles; ///< Number of tiles (i.e. groups of rows)
376 uint32_t _numRowsPerTile; ///< Number of rows per tile
377 uint32_t _totalNumRows; ///< Total number of raws
378 uint32_t _rowWidth; ///< Total number of bytes in one row
379 bool _headerFlushed; ///< Flag telling whether the header record is synchronized with the data on disk
380 char* _buffer; ///< Memory buffer to store rows while they are not compressed
381 Checksum _checksum; ///< Checksum for asserting the consistency of the data
382 fstream _file; ///< The actual file streamer for accessing disk data
383
384 //compression related stuff
385 typedef pair<int64_t, int64_t> CatalogEntry;
386 typedef vector<CatalogEntry> CatalogRow;
387 typedef vector<CatalogRow> CatalogType;
388 CatalogType _catalog; ///< Catalog, i.e. the main table that points to the compressed data.
389 uint64_t _heapPtr; ///< the address in the file of the heap area
390 vector<char*> _transposedBuffer; ///< Memory buffer to store rows while they are transposed
391 vector<char*> _compressedBuffer; ///< Memory buffer to store rows while they are compressed
392
393 //thread related stuff
394 uint32_t _numThreads; ///< The number of threads that will be used to compress
395 uint32_t _threadIndex; ///< A variable to assign threads indices
396 vector<pthread_t> _thread; ///< The thread handler of the compressor
397 vector<uint32_t> _threadNumRows; ///< Total number of rows for thread to compress
398 vector<uint32_t> _threadStatus; ///< Flag telling whether the buffer to be transposed (and compressed) is full or empty
399
400 //thread states. Not all used, but they do not hurt
401 static const uint32_t _THREAD_WAIT_; ///< Thread doing nothing
402 static const uint32_t _THREAD_COMPRESS_; ///< Thread working, compressing
403 static const uint32_t _THREAD_DECOMPRESS_; ///< Thread working, decompressing
404 static const uint32_t _THREAD_WRITE_; ///< Thread writing data to disk
405 static const uint32_t _THREAD_READ_; ///< Thread reading data from disk
406 static const uint32_t _THREAD_EXIT_; ///< Thread exiting
407
408 static HeaderEntry _dummyHeaderEntry; ///< Dummy entry for returning if requested on is not found
409};
410
411class CompressedFitsWriter : public CompressedFitsFile
412{
413 public:
414 ///@brief Default constructor. 100 tiles of 100 rows each are assigned by default
415 CompressedFitsWriter(uint32_t numTiles=100, uint32_t numRowsPerTile=100);
416
417 ///@brief default destructor
418 virtual ~CompressedFitsWriter();
419
420 ///@brief add one column to the file
421 bool addColumn(const ColumnEntry& column);
422
423 ///@brief sets a given header key
424 bool setHeaderKey(const HeaderEntry&);
425
426 bool changeHeaderKey(const string& origName, const string& newName);
427
428 ///@brief open a new fits file
429 bool open(const string& fileName, const string& tableName="Data");
430
431 ///@brief close the opened file
432 bool close();
433
434 ///@brief write one row of data, already placed in bufferToWrite. Does the byte-swapping
435 bool writeBinaryRow(const char* bufferToWrite);
436
437 uint32_t getRowWidth();
438
439 ///@brief assign a given (already loaded) drs calibration
440 void setDrsCalib(int16_t* data);
441
442 ///@brief set the number of worker threads compressing the data.
443 bool setNumWorkingThreads(uint32_t num);
444
445 private:
446 ///@brief compresses one buffer of data, the one given by threadIndex
447 uint64_t compressBuffer(uint32_t threadIndex);
448
449 ///@brief writes an already compressed buffer to disk
450 bool writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite);
451
452 ///@brief add the header checksum to the datasum
453 void addHeaderChecksum(Checksum& checksum);
454
455 ///@brief write the header. If closingFile is set to true, checksum is calculated
456 void writeHeader(bool closingFile = false);
457
458 ///@brief write the compressed data catalog. If closingFile is set to true, checksum is calculated
459 void writeCatalog(bool closingFile=false);
460
461 /// FIXME this was a bad idea. Move everything to the regular header
462 vector<HeaderEntry> _defaultHeader;
463
464 /// the main function compressing the data
465 static void* threadFunction(void* context);
466
467 /// Write the drs calibration to disk, if any
468 void writeDrsCalib();
469
470 /// Copy and transpose (or semi-transpose) one tile of data
471 void copyTransposeTile(uint32_t index);
472
473 /// Specific compression functions
474 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
475 uint32_t compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
476 uint32_t compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
477 uint32_t applySMOOTHING(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
478
479 int32_t _checkOffset; ///< offset to the data pointer to calculate the checksum
480 int16_t* _drsCalibData; ///< array of the Drs baseline mean
481 int32_t _threadLooper; ///< Which thread will deal with the upcoming bunch of data ?
482 pthread_mutex_t _mutex; ///< mutex for compressing threads
483
484
485 static string _emptyBlock; ///< an empty block to be apened at the end of a file so that its length is a multiple of 2880
486 static string _fitsHeader; ///< the default header to be written in every fits file
487};
488
489const uint32_t CompressedFitsFile::_THREAD_WAIT_ = 0;
490const uint32_t CompressedFitsFile::_THREAD_COMPRESS_ = 1;
491const uint32_t CompressedFitsFile::_THREAD_DECOMPRESS_= 2;
492const uint32_t CompressedFitsFile::_THREAD_WRITE_ = 3;
493const uint32_t CompressedFitsFile::_THREAD_READ_ = 4;
494const uint32_t CompressedFitsFile::_THREAD_EXIT_ = 5;
495
496template<>
497void CompressedFitsFile::HeaderEntry::setValue(const string& v)
498{
499 string val = v;
500 if (val.size() > 2 && val[0] == '\'')
501 {
502 size_t pos = val.find_last_of("'");
503 if (pos != string::npos && pos != 0)
504 val = val.substr(1, pos-1);
505 }
506 ostringstream str;
507
508 str << "'" << val << "'";
509 for (int i=str.str().length(); i<20;i++)
510 str << " ";
511 _value = str.str();
512 buildFitsString();
513}
514
515/**
516 * Default header to be written in all fits files
517 */
518string CompressedFitsWriter::_fitsHeader = "SIMPLE = T / file does conform to FITS standard "
519 "BITPIX = 8 / number of bits per data pixel "
520 "NAXIS = 0 / number of data axes "
521 "EXTEND = T / FITS dataset may contain extensions "
522 "CHECKSUM= '4AcB48bA4AbA45bA' / Checksum for the whole HDU "
523 "DATASUM = ' 0' / Checksum for the data block "
524 "COMMENT FITS (Flexible Image Transport System) format is defined in 'Astronomy"
525 "COMMENT and Astrophysics', volume 376, page 359; bibcode: 2001A&A...376..359H "
526 "END "
527 " "
528 " "
529 " "
530 " "
531 " "
532 " "
533 " "
534 " "
535 " "
536 " "
537 " "
538 " "
539 " "
540 " "
541 " "
542 " "
543 " "
544 " "
545 " "
546 " "
547 " "
548 " "
549 " "
550 " "
551 " "
552 " "
553 " ";
554
555/**
556 * Empty block to be appenned at the end of files, so that the length matches multiple of 2880 bytes
557 *
558 */
559string CompressedFitsWriter::_emptyBlock = " "
560 " "
561 " "
562 " "
563 " "
564 " "
565 " "
566 " "
567 " "
568 " "
569 " "
570 " "
571 " "
572 " "
573 " "
574 " "
575 " "
576 " "
577 " "
578 " "
579 " "
580 " "
581 " "
582 " "
583 " "
584 " "
585 " "
586 " "
587 " "
588 " "
589 " "
590 " "
591 " "
592 " "
593 " "
594 " ";
595
596CompressedFitsFile::HeaderEntry CompressedFitsFile::_dummyHeaderEntry;
597/****************************************************************
598 * SUPER CLASS DEFAULT CONSTRUCTOR
599 ****************************************************************/
600CompressedFitsFile::CompressedFitsFile(uint32_t numTiles,
601 uint32_t numRowsPerTile) : _header(),
602 _columns(),
603 _numTiles(numTiles),
604 _numRowsPerTile(numRowsPerTile),
605 _totalNumRows(0),
606 _rowWidth(0),
607 _headerFlushed(false),
608 _buffer(NULL),
609 _checksum(0),
610 _heapPtr(0),
611 _transposedBuffer(1),
612 _compressedBuffer(1),
613 _numThreads(1),
614 _threadIndex(0),
615 _thread(1),
616 _threadNumRows(1),
617 _threadStatus(1)
618{
619 _catalog.resize(_numTiles);
620 _transposedBuffer[0] = NULL;
621 _compressedBuffer[0] = NULL;
622 _threadStatus[0] = _THREAD_WAIT_;
623 _threadNumRows[0] = 0;
624}
625
626/****************************************************************
627 * SUPER CLASS DEFAULT DESTRUCTOR
628 ****************************************************************/
629CompressedFitsFile::~CompressedFitsFile()
630{
631 if (_buffer != NULL)
632 {
633 _buffer = _buffer-4;
634 delete[] _buffer;
635 _buffer = NULL;
636 for (uint32_t i=0;i<_numThreads;i++)
637 {
638 _compressedBuffer[i] = _compressedBuffer[i]-4;
639 delete[] _transposedBuffer[i];
640 delete[] _compressedBuffer[i];
641 _transposedBuffer[i] = NULL;
642 _compressedBuffer[i] = NULL;
643 }
644 }
645 if (_file.is_open())
646 _file.close();
647}
648
649/****************************************************************
650 * REALLOCATE BUFFER
651 ****************************************************************/
652bool CompressedFitsFile::reallocateBuffers()
653{
654 if (_buffer)
655 {
656 _buffer = _buffer - 4;
657 delete[] _buffer;
658 for (uint32_t i=0;i<_compressedBuffer.size();i++)
659 {
660 _compressedBuffer[i] = _compressedBuffer[i]-4;
661 delete[] _transposedBuffer[i];
662 delete[] _compressedBuffer[i];
663 }
664 }
665 _buffer = new char[_rowWidth*_numRowsPerTile + 12];
666 if (_buffer == NULL) return false;
667 memset(_buffer, 0, 4);
668 _buffer = _buffer + 4;
669 if (_compressedBuffer.size() != _numThreads)
670 {
671 _transposedBuffer.resize(_numThreads);
672 _compressedBuffer.resize(_numThreads);
673 }
674 for (uint32_t i=0;i<_numThreads;i++)
675 {
676 _transposedBuffer[i] = new char[_rowWidth*_numRowsPerTile];
677 _compressedBuffer[i] = new char[_rowWidth*_numRowsPerTile + _columns.size() + sizeof(TileHeader) + 12]; //use a bit more memory for compression flags and checksumming
678 if (_transposedBuffer[i] == NULL || _compressedBuffer[i] == NULL)
679 return false;
680 //shift the compressed buffer by 4 bytes, for checksum calculation
681 memset(_compressedBuffer[i], 0, 4);
682 _compressedBuffer[i] = _compressedBuffer[i]+4;
683 //initialize the tile header
684 TileHeader tileHeader;
685 memcpy(_compressedBuffer[i], &tileHeader, sizeof(TileHeader));
686 }
687 return true;
688}
689
690/****************************************************************
691 * DEFAULT WRITER CONSTRUCTOR
692 ****************************************************************/
693CompressedFitsWriter::CompressedFitsWriter(uint32_t numTiles,
694 uint32_t numRowsPerTile) : CompressedFitsFile(numTiles, numRowsPerTile),
695 _checkOffset(0),
696 _drsCalibData(NULL),
697 _threadLooper(0)
698{
699 _defaultHeader.push_back(HeaderEntry("XTENSION", "'BINTABLE' ", "binary table extension"));
700 _defaultHeader.push_back(HeaderEntry("BITPIX", 8, "8-bit bytes"));
701 _defaultHeader.push_back(HeaderEntry("NAXIS", 2, "2-dimensional binary table"));
702 _defaultHeader.push_back(HeaderEntry("NAXIS1", _rowWidth, "width of table in bytes"));
703 _defaultHeader.push_back(HeaderEntry("NAXIS2", numTiles, "num of rows in table"));
704 _defaultHeader.push_back(HeaderEntry("PCOUNT", 0, "size of special data area"));
705 _defaultHeader.push_back(HeaderEntry("GCOUNT", 1, "one data group (required keyword)"));
706 _defaultHeader.push_back(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
707 _defaultHeader.push_back(HeaderEntry("CHECKSUM", "'0000000000000000' ", "Checksum for the whole HDU"));
708 _defaultHeader.push_back(HeaderEntry("DATASUM", " 0", "Checksum for the data block"));
709 //compression stuff
710 _defaultHeader.push_back(HeaderEntry("ZTABLE", "T", "Table is compressed"));
711 _defaultHeader.push_back(HeaderEntry("ZNAXIS1", 0, "Width of uncompressed rows"));
712 _defaultHeader.push_back(HeaderEntry("ZNAXIS2", 0, "Number of uncompressed rows"));
713 _defaultHeader.push_back(HeaderEntry("ZPCOUNT", 0, ""));
714 _defaultHeader.push_back(HeaderEntry("ZHEAPPTR", 0, ""));
715 _defaultHeader.push_back(HeaderEntry("ZTILELEN", numRowsPerTile, "Number of rows per tile"));
716 _defaultHeader.push_back(HeaderEntry("THEAP", 0, ""));
717
718 pthread_mutex_init(&_mutex, NULL);
719}
720
721/****************************************************************
722 * DEFAULT DESTRUCTOR
723 ****************************************************************/
724CompressedFitsWriter::~CompressedFitsWriter()
725{
726 pthread_mutex_destroy(&_mutex);
727}
728
729/****************************************************************
730 * SET THE POINTER TO THE DRS CALIBRATION
731 ****************************************************************/
732void CompressedFitsWriter::setDrsCalib(int16_t* data)
733{
734 _drsCalibData = data;
735}
736
737/****************************************************************
738 * SET NUM WORKING THREADS
739 ****************************************************************/
740bool CompressedFitsWriter::setNumWorkingThreads(uint32_t num)
741{
742 if (_file.is_open())
743 return false;
744 if (num < 1 || num > 64)
745 {
746 cout << "ERROR: num threads must be between 1 and 64. Ignoring" << endl;
747 return false;
748 }
749 _numThreads = num;
750 _transposedBuffer[0] = NULL;
751 _compressedBuffer[0] = NULL;
752 _threadStatus.resize(num);
753 _thread.resize(num);
754 _threadNumRows.resize(num);
755 for (uint32_t i=0;i<num;i++)
756 {
757 _threadNumRows[i] = 0;
758 _threadStatus[i] = _THREAD_WAIT_;
759 }
760 return reallocateBuffers();
761}
762
763/****************************************************************
764 * WRITE DRS CALIBRATION TO FILE
765 ****************************************************************/
766void CompressedFitsWriter::writeDrsCalib()
767{
768 //if file was not loaded, ignore
769 if (_drsCalibData == NULL)
770 return;
771 uint64_t whereDidIStart = _file.tellp();
772 vector<HeaderEntry> header;
773 header.push_back(HeaderEntry("XTENSION", "'BINTABLE' ", "binary table extension"));
774 header.push_back(HeaderEntry("BITPIX" , 8 , "8-bit bytes"));
775 header.push_back(HeaderEntry("NAXIS" , 2 , "2-dimensional binary table"));
776 header.push_back(HeaderEntry("NAXIS1" , 1024*1440*2 , "width of table in bytes"));
777 header.push_back(HeaderEntry("NAXIS2" , 1 , "number of rows in table"));
778 header.push_back(HeaderEntry("PCOUNT" , 0 , "size of special data area"));
779 header.push_back(HeaderEntry("GCOUNT" , 1 , "one data group (required keyword)"));
780 header.push_back(HeaderEntry("TFIELDS" , 1 , "number of fields in each row"));
781 header.push_back(HeaderEntry("CHECKSUM", "'0000000000000000' ", "Checksum for the whole HDU"));
782 header.push_back(HeaderEntry("DATASUM" , " 0" , "Checksum for the data block"));
783 header.push_back(HeaderEntry("EXTNAME" , "'ZDrsCellOffsets' ", "name of this binary table extension"));
784 header.push_back(HeaderEntry("TTYPE1" , "'OffsetCalibration' ", "label for field 1"));
785 header.push_back(HeaderEntry("TFORM1" , "'1474560I' ", "data format of field: 2-byte INTEGER"));
786
787 for (uint32_t i=0;i<header.size();i++)
788 _file.write(header[i].fitsString().c_str(), 80);
789 //End the header
790 _file.write("END ", 80);
791 long here = _file.tellp();
792 if (here%2880)
793 _file.write(_emptyBlock.c_str(), 2880 - here%2880);
794 //now write the data itself
795 int16_t* swappedBytes = new int16_t[1024];
796 Checksum checksum;
797 for (int32_t i=0;i<1440;i++)
798 {
799 memcpy(swappedBytes, &(_drsCalibData[i*1024]), 2048);
800 for (int32_t j=0;j<2048;j+=2)
801 {
802 int8_t inter;
803 inter = reinterpret_cast<int8_t*>(swappedBytes)[j];
804 reinterpret_cast<int8_t*>(swappedBytes)[j] = reinterpret_cast<int8_t*>(swappedBytes)[j+1];
805 reinterpret_cast<int8_t*>(swappedBytes)[j+1] = inter;
806 }
807 _file.write(reinterpret_cast<char*>(swappedBytes), 2048);
808 checksum.add(reinterpret_cast<char*>(swappedBytes), 2048);
809 }
810 uint64_t whereDidIStop = _file.tellp();
811 delete[] swappedBytes;
812 //No need to pad the data, as (1440*1024*2)%2880==0
813
814 //calculate the checksum from the header
815 ostringstream str;
816 str << checksum.val();
817 header[9] = HeaderEntry("DATASUM", str.str(), "Checksum for the data block");
818 for (vector<HeaderEntry>::iterator it=header.begin();it!=header.end(); it++)
819 checksum.add(it->fitsString().c_str(), 80);
820 string end("END ");
821 string space(" ");
822 checksum.add(end.c_str(), 80);
823 int headerRowsLeft = 36 - (header.size() + 1)%36;
824 for (int i=0;i<headerRowsLeft;i++)
825 checksum.add(space.c_str(), 80);
826 //udpate the checksum keyword
827 header[8] = HeaderEntry("CHECKSUM", checksum.str(), "Checksum for the whole HDU");
828 //and eventually re-write the header data
829 _file.seekp(whereDidIStart);
830 for (uint32_t i=0;i<header.size();i++)
831 _file.write(header[i].fitsString().c_str(), 80);
832 _file.seekp(whereDidIStop);
833}
834
835/****************************************************************
836 * ADD COLUMN
837 ****************************************************************/
838bool CompressedFitsWriter::addColumn(const ColumnEntry& column)
839{
840 if (_totalNumRows != 0)
841 {
842 cout << "Error: cannot add new columns once first row has been written" << endl;
843 return false;
844 }
845 for (vector<ColumnEntry>::iterator it=_columns.begin(); it != _columns.end(); it++)
846 {
847 if (it->name() == column.name())
848 {
849 cout << "Warning: column already exist (" << column.name() << "). Ignoring" << endl;
850 return false;
851 }
852 }
853 _columns.push_back(column);
854 _columns.back().setOffset(_rowWidth);
855 _rowWidth += column.width();
856 reallocateBuffers();
857
858 ostringstream str, str2, str3;
859 str << "TTYPE" << _columns.size();
860 str2 << column.name();
861 str3 << "label for field ";
862 if (_columns.size() < 10) str3 << " ";
863 if (_columns.size() < 100) str3 << " ";
864 str3 << _columns.size();
865 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
866 str.str("");
867 str2.str("");
868 str3.str("");
869 str << "TFORM" << _columns.size();
870 str2 << "1QB";
871 str3 << "data format of field " << _columns.size();
872 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
873 str.str("");
874 str2.str("");
875 str3.str("");
876 str << "ZFORM" << _columns.size();
877 str2 << column.numElems() << column.type();
878 str3 << "Original format of field " << _columns.size();
879 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
880 str.str("");
881 str2.str("");
882 str3.str("");
883 str << "ZCTYP" << _columns.size();
884 str2 << column.getCompressionString();
885 str3 << "Comp. Scheme of field " << _columns.size();
886 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
887 //resize the catalog vector accordingly
888 for (uint32_t i=0;i<_numTiles;i++)
889 {
890 _catalog[i].resize(_columns.size());
891 for (uint32_t j=0;j<_catalog[i].size();j++)
892 _catalog[i][j] = make_pair(0,0);
893 }
894 return true;
895}
896
897/****************************************************************
898 * SET HEADER KEY
899 ****************************************************************/
900bool CompressedFitsWriter::setHeaderKey(const HeaderEntry& entry)
901{
902 HeaderEntry ent = entry;
903 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
904 {
905 if (it->key() == entry.key())
906 {
907 if (entry.comment() == "")
908 ent.setComment(it->comment());
909 (*it) = ent;
910 _headerFlushed = false;
911 return true;
912 }
913 }
914 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
915 {
916 if (it->key() == entry.key())
917 {
918 if (entry.comment() == "")
919 ent.setComment(it->comment());
920 (*it) = ent;
921 _headerFlushed = false;
922 return true;
923 }
924 }
925 if (_totalNumRows != 0)
926 {
927 cout << "Error: new header keys (" << entry.key() << ") must be set before the first row is written. Ignoring." << endl;
928 return false;
929 }
930 _header.push_back(entry);
931 _headerFlushed = false;
932 return true;
933}
934
935bool CompressedFitsWriter::changeHeaderKey(const string& origName, const string& newName)
936{
937 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
938 {
939 if (it->key() == origName)
940 {
941 (*it) = HeaderEntry(newName, it->value(), it->comment());
942 _headerFlushed = false;
943 return true;
944 }
945 }
946 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
947 {
948 if (it->key() == origName)
949 {
950 (*it) = HeaderEntry(newName, it->value(), it->comment());
951 _headerFlushed = false;
952 return true;
953 }
954 }
955 return false;
956}
957/****************************************************************
958 * OPEN
959 ****************************************************************/
960bool CompressedFitsWriter::open(const string& fileName, const string& tableName)
961{
962 _file.open(fileName.c_str(), ios_base::out);
963 if (!_file.is_open())
964 {
965 cout << "Error: Could not open the file (" << fileName << ")." << endl;
966 return false;
967 }
968 _defaultHeader.push_back(HeaderEntry("EXTNAME", tableName, "name of this binary table extension"));
969 _headerFlushed = false;
970 _threadIndex = 0;
971 //create requested number of threads
972 for (uint32_t i=0;i<_numThreads;i++)
973 pthread_create(&(_thread[i]), NULL, threadFunction, this);
974 //wait for the threads to start
975 while (_numThreads != _threadIndex)
976 usleep(1000);
977 //set the writing fence to the last thread
978 _threadIndex = _numThreads-1;
979 return (_file.good());
980}
981
982/****************************************************************
983 * WRITE HEADER
984 ****************************************************************/
985void CompressedFitsWriter::writeHeader(bool closingFile)
986{
987 if (_headerFlushed)
988 return;
989 if (!_file.is_open())
990 return;
991
992 long cPos = _file.tellp();
993
994 _file.seekp(0);
995
996 _file.write(_fitsHeader.c_str(), 2880);
997
998 //Write the DRS calib table here !
999 writeDrsCalib();
1000
1001 //we are now at the beginning of the main table. Write its header
1002 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
1003 _file.write(it->fitsString().c_str(), 80);
1004
1005 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
1006 _file.write(it->fitsString().c_str(), 80);
1007
1008 _file.write("END ", 80);
1009 long here = _file.tellp();
1010 if (here%2880)
1011 _file.write(_emptyBlock.c_str(), 2880 - here%2880);
1012
1013 _headerFlushed = true;
1014
1015 here = _file.tellp();
1016
1017 if (here%2880)
1018 cout << "Error: seems that header did not finish at the end of a block." << endl;
1019
1020 if (here > cPos && cPos != 0)
1021 {
1022 cout << "Error, entries were added after the first row was written. This is not supposed to happen." << endl;
1023 return;
1024 }
1025
1026 here = _file.tellp();
1027 writeCatalog(closingFile);
1028
1029 here = _file.tellp() - here;
1030 _heapPtr = here;
1031
1032 if (cPos != 0)
1033 _file.seekp(cPos);
1034}
1035
1036/****************************************************************
1037 * WRITE CATALOG
1038 * WARNING: writeCatalog is only meant to be used by writeHeader.
1039 * external usage will most likely corrupt the file
1040 ****************************************************************/
1041void CompressedFitsWriter::writeCatalog(bool closingFile)
1042{
1043 uint32_t sizeWritten = 0;
1044 for (uint32_t i=0;i<_catalog.size();i++)
1045 {
1046 for (uint32_t j=0;j<_catalog[i].size();j++)
1047 {
1048 //swap the bytes
1049 int8_t swappedEntry[16];
1050 swappedEntry[0] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[7];
1051 swappedEntry[1] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[6];
1052 swappedEntry[2] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[5];
1053 swappedEntry[3] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[4];
1054 swappedEntry[4] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[3];
1055 swappedEntry[5] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[2];
1056 swappedEntry[6] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[1];
1057 swappedEntry[7] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[0];
1058
1059 swappedEntry[8] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[7];
1060 swappedEntry[9] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[6];
1061 swappedEntry[10] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[5];
1062 swappedEntry[11] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[4];
1063 swappedEntry[12] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[3];
1064 swappedEntry[13] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[2];
1065 swappedEntry[14] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[1];
1066 swappedEntry[15] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[0];
1067 if (closingFile)
1068 {
1069 _checksum.add(reinterpret_cast<char*>(swappedEntry), 16);
1070 }
1071 _file.write(reinterpret_cast<char*>(&swappedEntry[0]), 2*sizeof(int64_t));
1072 sizeWritten += 2*sizeof(int64_t);
1073 }
1074 }
1075
1076 //we do not reserve space for now because fverify does not like that.
1077 //TODO bug should be fixed in the new version. Install it on the cluster and restor space reservation
1078 return ;
1079
1080 //write the padding so that the HEAP section starts at a 2880 bytes boundary
1081 if (sizeWritten % 2880 != 0)
1082 {
1083 vector<char> nullVec(2880 - sizeWritten%2880, 0);
1084 _file.write(nullVec.data(), 2880 - sizeWritten%2880);
1085 }
1086}
1087
1088/****************************************************************
1089 * ADD HEADER CHECKSUM
1090 ****************************************************************/
1091void CompressedFitsWriter::addHeaderChecksum(Checksum& checksum)
1092{
1093 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin();it!=_defaultHeader.end(); it++)
1094 _checksum.add(it->fitsString().c_str(), 80);
1095 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
1096 _checksum.add(it->fitsString().c_str(), 80);
1097 string end("END ");
1098 string space(" ");
1099 checksum.add(end.c_str(), 80);
1100 int headerRowsLeft = 36 - (_defaultHeader.size() + _header.size() + 1)%36;
1101 for (int i=0;i<headerRowsLeft;i++)
1102 checksum.add(space.c_str(), 80);
1103}
1104
1105/****************************************************************
1106 * CLOSE
1107 ****************************************************************/
1108bool CompressedFitsWriter::close()
1109{
1110 for (uint32_t i=0;i<_numThreads;i++)
1111 while (_threadStatus[i] != _THREAD_WAIT_)
1112 usleep(100000);
1113 for (uint32_t i=0;i<_numThreads;i++)
1114 _threadStatus[i] = _THREAD_EXIT_;
1115 for (uint32_t i=0;i<_numThreads;i++)
1116 pthread_join(_thread[i], NULL);
1117 //flush the rows that were not written yet
1118 if (_totalNumRows%_numRowsPerTile != 0)
1119 {
1120 copyTransposeTile(0);
1121
1122 _threadNumRows[0] = _totalNumRows;
1123 uint32_t numBytes = compressBuffer(0);
1124 writeCompressedDataToDisk(0, numBytes);
1125 }
1126 //compression stuff
1127 setHeaderKey(HeaderEntry("ZNAXIS1", _rowWidth, "Width of uncompressed rows"));
1128 setHeaderKey(HeaderEntry("ZNAXIS2", _totalNumRows, "Number of uncompressed rows"));
1129 //TODO calculate the real offset from the main table to the start of the HEAP data area
1130 setHeaderKey(HeaderEntry("ZHEAPPTR", _heapPtr, ""));
1131 setHeaderKey(HeaderEntry("THEAP", _heapPtr, ""));
1132
1133 //regular stuff
1134 if (_catalog.size() > 0)
1135 {
1136 setHeaderKey(HeaderEntry("NAXIS1", 2*sizeof(int64_t)*_catalog[0].size(), "width of table in bytes"));
1137 setHeaderKey(HeaderEntry("NAXIS2", _numTiles, ""));
1138 setHeaderKey(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
1139 int64_t heapSize = 0;
1140 int64_t compressedOffset = 0;
1141 for (uint32_t i=0;i<_catalog.size();i++)
1142 {
1143 compressedOffset += sizeof(TileHeader);
1144 heapSize += sizeof(TileHeader);
1145 for (uint32_t j=0;j<_catalog[i].size();j++)
1146 {
1147 heapSize += _catalog[i][j].first;
1148// cout << "heapSize: " << heapSize << endl;
1149 //set the catalog offsets to their actual values
1150 _catalog[i][j].second = compressedOffset;
1151 compressedOffset += _catalog[i][j].first;
1152 //special case if entry has zero length
1153 if (_catalog[i][j].first == 0) _catalog[i][j].second = 0;
1154 }
1155 }
1156 setHeaderKey(HeaderEntry("PCOUNT", heapSize, "size of special data area"));
1157 }
1158 else
1159 {
1160 setHeaderKey(HeaderEntry("NAXIS1", _columns.size()*2*sizeof(int64_t), "width of table in bytes"));
1161 setHeaderKey(HeaderEntry("NAXIS2", 0, ""));
1162 setHeaderKey(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
1163 setHeaderKey(HeaderEntry("PCOUNT", 0, "size of special data area"));
1164 changeHeaderKey("THEAP", "ZHEAP");
1165 }
1166 ostringstream str;
1167
1168 writeHeader(true);
1169
1170 str.str("");
1171 str << _checksum.val();
1172
1173 setHeaderKey(HeaderEntry("DATASUM", str.str(), ""));
1174 addHeaderChecksum(_checksum);
1175 setHeaderKey(HeaderEntry("CHECKSUM", _checksum.str(), ""));
1176 //update header value
1177 writeHeader();
1178 //update file length
1179 long here = _file.tellp();
1180 if (here%2880)
1181 {
1182 vector<char> nullVec(2880 - here%2880, 0);
1183 _file.write(nullVec.data(), 2880 - here%2880);
1184 }
1185 _file.close();
1186 return true;
1187}
1188
1189/****************************************************************
1190 * COPY TRANSPOSE TILE
1191 ****************************************************************/
1192void CompressedFitsWriter::copyTransposeTile(uint32_t index)
1193{
1194 uint32_t thisRoundNumRows = (_totalNumRows%_numRowsPerTile) ? _totalNumRows%_numRowsPerTile : _numRowsPerTile;
1195
1196 //copy the tile and transpose it
1197 uint32_t offset = 0;
1198 for (uint32_t i=0;i<_columns.size();i++)
1199 {
1200 switch (_columns[i].getColumnOrdering())//getCompression())
1201 {
1202 case zfits::kOrderByRow:
1203 for (uint32_t k=0;k<thisRoundNumRows;k++)
1204 {//regular, "semi-transposed" copy
1205 memcpy(&(_transposedBuffer[index][offset]), &_buffer[k*_rowWidth + _columns[i].offset()], _columns[i].sizeOfElems()*_columns[i].numElems());
1206 offset += _columns[i].sizeOfElems()*_columns[i].numElems();
1207 }
1208 break;
1209
1210 case zfits::kOrderByCol :
1211 for (int j=0;j<_columns[i].numElems();j++)
1212 for (uint32_t k=0;k<thisRoundNumRows;k++)
1213 {//transposed copy
1214 memcpy(&(_transposedBuffer[index][offset]), &_buffer[k*_rowWidth + _columns[i].offset() + _columns[i].sizeOfElems()*j], _columns[i].sizeOfElems());
1215 offset += _columns[i].sizeOfElems();
1216 }
1217 break;
1218 default:
1219 cout << "Error: unknown column ordering: " << _columns[i].getColumnOrdering() << endl;
1220
1221 };
1222 }
1223}
1224
1225/****************************************************************
1226 * WRITE BINARY ROW
1227 ****************************************************************/
1228bool CompressedFitsWriter::writeBinaryRow(const char* bufferToWrite)
1229{
1230 if (_totalNumRows == 0)
1231 writeHeader();
1232
1233 memcpy(&_buffer[_rowWidth*(_totalNumRows%_numRowsPerTile)], bufferToWrite, _rowWidth);
1234 _totalNumRows++;
1235 if (_totalNumRows%_numRowsPerTile == 0)
1236 {
1237 //which is the next thread that we should use ?
1238 while (_threadStatus[_threadLooper] == _THREAD_COMPRESS_)
1239 usleep(100000);
1240
1241 copyTransposeTile(_threadLooper);
1242
1243 while (_threadStatus[_threadLooper] != _THREAD_WAIT_)
1244 usleep(100000);
1245
1246 _threadNumRows[_threadLooper] = _totalNumRows;
1247 _threadStatus[_threadLooper] = _THREAD_COMPRESS_;
1248 _threadLooper = (_threadLooper+1)%_numThreads;
1249 }
1250 return _file.good();
1251}
1252
1253uint32_t CompressedFitsWriter::getRowWidth()
1254{
1255 return _rowWidth;
1256}
1257
1258/****************************************************************
1259 * COMPRESS BUFFER
1260 ****************************************************************/
1261uint32_t CompressedFitsWriter::compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1262{
1263 memcpy(dest, src, numRows*sizeOfElems*numRowElems);
1264 return numRows*sizeOfElems*numRowElems;
1265}
1266
1267/****************************************************************
1268 * COMPRESS BUFFER
1269 ****************************************************************/
1270uint32_t CompressedFitsWriter::compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1271{
1272 string huffmanOutput;
1273 uint32_t previousHuffmanSize = 0;
1274 if (numRows < 2)
1275 {//if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
1276 return numRows*sizeOfElems*numRowElems + 1000;
1277 }
1278 if (sizeOfElems < 2 )
1279 {
1280 cout << "Fatal ERROR: HUFMANN can only encode short or longer types" << endl;
1281 return 0;
1282 }
1283 uint32_t huffmanOffset = 0;
1284 for (uint32_t j=0;j<numRowElems;j++)
1285 {
1286 Huffman::Encode(huffmanOutput,
1287 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
1288 numRows*(sizeOfElems/2));
1289 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
1290 huffmanOffset += sizeof(uint32_t);
1291 previousHuffmanSize = huffmanOutput.size();
1292 }
1293 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
1294
1295 //only copy if not larger than not-compressed size
1296 if (totalSize < numRows*sizeOfElems*numRowElems)
1297 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
1298
1299 return totalSize;
1300}
1301
1302/****************************************************************
1303 * COMPRESS BUFFER
1304 ****************************************************************/
1305uint32_t CompressedFitsWriter::compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1306{
1307 uint32_t colWidth = numRowElems;
1308 for (int j=colWidth*numRows-1;j>1;j--)
1309 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
1310 //call the huffman transposed
1311 return compressHUFFMAN(dest, src, numRowElems, sizeOfElems, numRows);
1312}
1313
1314uint32_t CompressedFitsWriter::applySMOOTHING(char* , char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1315{
1316 uint32_t colWidth = numRowElems;
1317 for (int j=colWidth*numRows-1;j>1;j--)
1318 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
1319
1320 return numRows*sizeOfElems*numRowElems;
1321}
1322/****************************************************************
1323 * COMPRESS BUFFER
1324 ****************************************************************/
1325uint64_t CompressedFitsWriter::compressBuffer(uint32_t threadIndex)
1326{
1327 uint32_t thisRoundNumRows = (_threadNumRows[threadIndex]%_numRowsPerTile) ? _threadNumRows[threadIndex]%_numRowsPerTile : _numRowsPerTile;
1328 uint32_t offset=0;
1329 uint32_t currentCatalogRow = (_threadNumRows[threadIndex]-1)/_numRowsPerTile;
1330 uint64_t compressedOffset = sizeof(TileHeader); //skip the 'TILE' marker and tile size entry
1331
1332 //now compress each column one by one by calling compression on arrays
1333 for (uint32_t i=0;i<_columns.size();i++)
1334 {
1335 _catalog[currentCatalogRow][i].second = compressedOffset;
1336
1337 if (_columns[i].numElems() == 0) continue;
1338
1339 BlockHeader& head = _columns[i].getBlockHeader();
1340 const vector<uint16_t>& sequence = _columns[i].getCompressionSequence();
1341 //set the default byte telling if uncompressed the compressed Flag
1342 uint64_t previousOffset = compressedOffset;
1343 //skip header data
1344 compressedOffset += sizeof(BlockHeader) + sizeof(uint16_t)*sequence.size();
1345
1346 for (uint32_t j=0;j<sequence.size(); j++)
1347 {
1348 switch (sequence[j])
1349 {
1350 case zfits::kFactRaw:
1351// if (head.numProcs == 1)
1352 compressedOffset += compressUNCOMPRESSED(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1353 break;
1354 case zfits::kFactSmoothing:
1355 applySMOOTHING(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1356 break;
1357 case zfits::kFactHuffman16:
1358 if (head.ordering == zfits::kOrderByCol)
1359 compressedOffset += compressHUFFMAN(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1360 else
1361 compressedOffset += compressHUFFMAN(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), _columns[i].numElems(), _columns[i].sizeOfElems(), thisRoundNumRows);
1362 break;
1363 default:
1364 cout << "ERROR: Unkown compression sequence entry: " << sequence[i] << endl;
1365 break;
1366 }
1367 }
1368
1369 //check if compressed size is larger than uncompressed
1370 if (sequence[0] != zfits::kFactRaw &&
1371 compressedOffset - previousOffset > _columns[i].sizeOfElems()*_columns[i].numElems()*thisRoundNumRows+sizeof(BlockHeader)+sizeof(uint16_t)*sequence.size())
1372 {//if so set flag and redo it uncompressed
1373 cout << "REDOING UNCOMPRESSED" << endl;
1374 compressedOffset = previousOffset + sizeof(BlockHeader) + 1;
1375 compressedOffset += compressUNCOMPRESSED(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1376 BlockHeader he;
1377 he.size = compressedOffset - previousOffset;
1378 he.numProcs = 1;
1379 he.ordering = zfits::kOrderByRow;
1380 memcpy(&(_compressedBuffer[threadIndex][previousOffset]), (char*)(&he), sizeof(BlockHeader));
1381 _compressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)] = zfits::kFactRaw;
1382 offset += thisRoundNumRows*_columns[i].sizeOfElems()*_columns[i].numElems();
1383 _catalog[currentCatalogRow][i].first = compressedOffset - _catalog[currentCatalogRow][i].second;
1384 continue;
1385 }
1386 head.size = compressedOffset - previousOffset;
1387 memcpy(&(_compressedBuffer[threadIndex][previousOffset]), (char*)(&head), sizeof(BlockHeader));
1388 memcpy(&(_compressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)]), sequence.data(), sizeof(uint16_t)*sequence.size());
1389
1390 offset += thisRoundNumRows*_columns[i].sizeOfElems()*_columns[i].numElems();
1391 _catalog[currentCatalogRow][i].first = compressedOffset - _catalog[currentCatalogRow][i].second;
1392 }
1393
1394 TileHeader tHead(thisRoundNumRows, compressedOffset);
1395 memcpy(_compressedBuffer[threadIndex], &tHead, sizeof(TileHeader));
1396 return compressedOffset;
1397}
1398
1399/****************************************************************
1400 * WRITE COMPRESS DATA TO DISK
1401 ****************************************************************/
1402bool CompressedFitsWriter::writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite)
1403{
1404 char* checkSumPointer = _compressedBuffer[threadID];
1405 int32_t extraBytes = 0;
1406 uint32_t sizeToChecksum = sizeToWrite;
1407 if (_checkOffset != 0)
1408 {//should we extend the array to the left ?
1409 sizeToChecksum += _checkOffset;
1410 checkSumPointer -= _checkOffset;
1411 memset(checkSumPointer, 0, _checkOffset);
1412 }
1413 if (sizeToChecksum%4 != 0)
1414 {//should we extend the array to the right ?
1415 extraBytes = 4 - (sizeToChecksum%4);
1416 memset(checkSumPointer+sizeToChecksum, 0,extraBytes);
1417 sizeToChecksum += extraBytes;
1418 }
1419 //do the checksum
1420 _checksum.add(checkSumPointer, sizeToChecksum);
1421// cout << endl << "Checksum: " << _checksum.val() << endl;
1422 _checkOffset = (4 - extraBytes)%4;
1423 //write data to disk
1424 _file.write(_compressedBuffer[threadID], sizeToWrite);
1425 return _file.good();
1426}
1427
1428/****************************************************************
1429 * WRITER THREAD LOOP
1430 ****************************************************************/
1431void* CompressedFitsWriter::threadFunction(void* context)
1432{
1433 CompressedFitsWriter* myself =static_cast<CompressedFitsWriter*>(context);
1434
1435 uint32_t myID = 0;
1436 pthread_mutex_lock(&(myself->_mutex));
1437 myID = myself->_threadIndex++;
1438 pthread_mutex_unlock(&(myself->_mutex));
1439 uint32_t threadToWaitForBeforeWriting = (myID == 0) ? myself->_numThreads-1 : myID-1;
1440
1441 while (myself->_threadStatus[myID] != _THREAD_EXIT_)
1442 {
1443 while (myself->_threadStatus[myID] == _THREAD_WAIT_)
1444 usleep(100000);
1445 if (myself->_threadStatus[myID] != _THREAD_COMPRESS_)
1446 continue;
1447 uint32_t numBytes = myself->compressBuffer(myID);
1448 myself->_threadStatus[myID] = _THREAD_WRITE_;
1449
1450 //wait for the previous data to be written
1451 while (myself->_threadIndex != threadToWaitForBeforeWriting)
1452 usleep(1000);
1453 //do the actual writing to disk
1454 pthread_mutex_lock(&(myself->_mutex));
1455 myself->writeCompressedDataToDisk(myID, numBytes);
1456 myself->_threadIndex = myID;
1457 pthread_mutex_unlock(&(myself->_mutex));
1458 myself->_threadStatus[myID] = _THREAD_WAIT_;
1459 }
1460 return NULL;
1461}
1462
1463/****************************************************************
1464 * PRINT USAGE
1465 ****************************************************************/
1466void printUsage()
1467{
1468 cout << endl;
1469 cout << "The FACT-Fits compressor reads an input Fits file from FACT"
1470 " and compresses it.\n It can use a drs calibration in order to"
1471 " improve the compression ratio. If so, the input calibration"
1472 " is embedded into the compressed file.\n"
1473 " By default, the Data column will be compressed using SMOOTHMAN (Thomas' algorithm)"
1474 " while other columns will be compressed with the AMPLITUDE coding (Veritas)"
1475 "Usage: Compressed_Fits_Test <inputFile>";
1476 cout << endl;
1477}
1478
1479/****************************************************************
1480 * PRINT HELP
1481 ****************************************************************/
1482void printHelp()
1483{
1484 cout << endl;
1485 cout << "The inputFile is required. It must have fits in its filename and the compressed file will be written in the same folder. "
1486 "The fz extension will be added, replacing the .gz one if required \n"
1487 "If output is specified, then it will replace the automatically generated output filename\n"
1488 "If --drs, followed by a drs calib then it will be applied to the data before compressing\n"
1489 "rowPerTile can be used to tune how many rows are in each tile. Default is 100\n"
1490 "threads gives the number of threads to use. Cannot be less than the default (1)\n"
1491 "compression explicitely gives the compression scheme to use for a given column. The syntax is:\n"
1492 "<ColumnName>=<CompressionScheme> with <CompressionScheme> one of the following:\n"
1493 "UNCOMPRESSED\n"
1494 "AMPLITUDE\n"
1495 "HUFFMAN\n"
1496 "SMOOTHMAN\n"
1497 "INT_WAVELET\n"
1498 "\n"
1499 "--quiet removes any textual output, except error messages\n"
1500 "--verify makes the compressor check the compressed data. It will read it back, and compare the reconstructed CHECKSUM and DATASUM with the original file values."
1501 ;
1502 cout << endl << endl;
1503}
1504
1505/****************************************************************
1506 * SETUP CONFIGURATION
1507 ****************************************************************/
1508void setupConfiguration(Configuration& conf)
1509{
1510 po::options_description configs("FitsCompressor options");
1511 configs.add_options()
1512 ("inputFile,i", vars<string>(), "Input file")
1513 ("drs,d", var<string>(), "Input drs calibration file")
1514 ("rowPerTile,r", var<unsigned int>(), "Number of rows per tile. Default is 100")
1515 ("output,o", var<string>(), "Output file. If empty, .fz is appened to the original name")
1516 ("threads,t", var<unsigned int>(), "Number of threads to use for compression")
1517 ("compression,c", vars<string>(), "which compression to use for which column. Syntax <colName>=<compressionScheme>")
1518 ("quiet,q", po_switch(), "Should the program display any text at all ?")
1519 ("verify,v", po_switch(), "Should we verify the data that has been compressed ?")
1520 ;
1521 po::positional_options_description positional;
1522 positional.add("inputFile", -1);
1523 conf.AddOptions(configs);
1524 conf.SetArgumentPositions(positional);
1525}
1526
1527/****************************************************************
1528 * MAIN
1529 ****************************************************************/
1530int main(int argc, const char** argv)
1531{
1532 Configuration conf(argv[0]);
1533 conf.SetPrintUsage(printUsage);
1534 setupConfiguration(conf);
1535
1536 if (!conf.DoParse(argc, argv, printHelp))
1537 return -1;
1538
1539 //initialize the file names to nothing.
1540 string fileNameIn = "";
1541 string fileNameOut = "";
1542 string drsFileName = "";
1543 uint32_t numRowsPerTile = 100;
1544 bool displayText=true;
1545
1546 //parse configuration
1547 if (conf.Get<bool>("quiet")) displayText = false;
1548 const vector<string> inputFileNameVec = conf.Vec<string>("inputFile");
1549 if (inputFileNameVec.size() != 1)
1550 {
1551 cout << "Error: ";
1552 if (inputFileNameVec.size() == 0) cout << "no";
1553 else cout << inputFileNameVec.size();
1554 cout << " input file(s) given. Expected one. Aborting. Input:" << endl;;
1555 for (unsigned int i=0;i<inputFileNameVec.size(); i++)
1556 cout << inputFileNameVec[i] << endl;
1557 return -1;
1558 }
1559
1560 //Assign the input filename
1561 fileNameIn = inputFileNameVec[0];
1562
1563 //Check if we have a drs calib too
1564 if (conf.Has("drs")) drsFileName = conf.Get<string>("drs");
1565
1566 //Should we verify the data ?
1567 bool verifyDataPlease = false;
1568 if (conf.Has("verify")) verifyDataPlease = conf.Get<bool>("verify");
1569
1570
1571 //should we use a specific output filename ?
1572 if (conf.Has("output"))
1573 fileNameOut = conf.Get<string>("output");
1574 else
1575 {
1576 size_t pos = fileNameIn.find(".fits.fz");
1577 if (pos == string::npos)
1578 {
1579 cout << "ERROR: input file does not seems ot be fits. Aborting." << endl;
1580 return -1;
1581 }
1582 fileNameOut = fileNameIn.substr(0, pos) + ".fits";
1583 }
1584
1585
1586 //should we use specific compression on some columns ?
1587 const vector<string> columnsCompression = conf.Vec<string>("compression");
1588
1589 //split up values between column names and compression scheme
1590 vector<std::pair<string, string>> compressions;
1591 for (unsigned int i=0;i<columnsCompression.size();i++)
1592 {
1593 size_t pos = columnsCompression[i].find_first_of("=");
1594 if (pos == string::npos)
1595 {
1596 cout << "ERROR: Something wrong occured while parsing " << columnsCompression[i] << ". Aborting." << endl;
1597 return -1;
1598 }
1599 string comp = columnsCompression[i].substr(pos+1);
1600 if (comp != "UNCOMPRESSED" && comp != "AMPLITUDE" && comp != "HUFFMAN" &&
1601 comp != "SMOOTHMAN" && comp != "INT_WAVELET")
1602 {
1603 cout << "Unkown compression scheme requested (" << comp << "). Aborting." << endl;
1604 return -1;
1605 }
1606 compressions.push_back(make_pair(columnsCompression[i].substr(0, pos), comp));
1607 }
1608
1609 //How many rows per tile should we use ?
1610 if (conf.Has("rowPerTile")) numRowsPerTile = conf.Get<unsigned int>("rowPerTile");
1611
1612 //////////////////////////////////////////////////////////////////////////////////////
1613 // Done reading configuration. Open relevant files
1614 //////////////////////////////////////////////////////////////////////////////////////
1615
1616
1617 //Open input's fits file
1618 factfits inFile(fileNameIn, "", "Events", false);
1619
1620 if (!inFile.IsCompressedFITS())
1621 {
1622 cout << "ERROR: input file is NOT a compressed fits. Cannot be decompressed: Aborting." << endl;
1623 return -1;
1624 }
1625
1626 //decide how many tiles should be put in the compressed file
1627 uint32_t originalNumRows = inFile.GetNumRows();
1628 uint32_t numTiles = (originalNumRows%numRowsPerTile) ? (originalNumRows/numRowsPerTile)+1 : originalNumRows/numRowsPerTile;
1629// CompressedFitsWriter outFile(numTiles, numRowsPerTile);
1630
1631 //should we use a specific number of threads for compressing ?
1632 unsigned int numThreads = 1;
1633 if (conf.Has("threads"))
1634 {
1635 numThreads = conf.Get<unsigned int>("threads");
1636// outFile.setNumWorkingThreads(numThreads);
1637 }
1638
1639
1640
1641 //Because the file to open MUST be given by the constructor, I must use a pointer instead
1642 factfits* drsFile = NULL;
1643 //try to open the Drs file. If any.
1644 if (drsFileName != "")
1645 {
1646 try
1647 {
1648 drsFile = new factfits(drsFileName);
1649 }
1650 catch (...)
1651 {
1652 cout << "Error: could not open " << drsFileName << " for calibration" << endl;
1653 return -1;
1654 }
1655 }
1656
1657 if (displayText)
1658 {
1659 cout << endl;
1660 cout << "**********************" << endl;
1661 cout << "Will decompress from : " << fileNameIn << endl;
1662 cout << "to : " << fileNameOut << endl;
1663 cout << "**********************" << endl;
1664 cout << endl;
1665 }
1666
1667 //////////////////////////////////////////////////////////////////////////////////////
1668 // Done opening input files. Allocate memory and configure output file
1669 //////////////////////////////////////////////////////////////////////////////////////
1670
1671 //allocate the buffer for temporary storage of each read/written row
1672 uint32_t rowWidth = inFile.GetUInt("ZNAXIS1");
1673 char* buffer = new char[rowWidth + 12];
1674 memset(buffer, 0, 4);
1675 buffer = buffer+4;
1676
1677 //get the source columns
1678 const fits::Table::Columns& columns = inFile.GetColumns();
1679 const fits::Table::SortedColumns& sortedColumns = inFile.GetSortedColumns();
1680 if (displayText)
1681 cout << "Input file has " << columns.size() << " columns and " << inFile.GetNumRows() << " rows" << endl;
1682
1683
1684 //////////////////////////////////////////////////////////////////////////////////////
1685 // Done configuring compression. Do the real job now !
1686 //////////////////////////////////////////////////////////////////////////////////////
1687 vector<void*> readPointers;
1688 vector<int32_t> readOffsets;
1689 vector<int32_t> readElemSize;
1690 vector<int32_t> readNumElems;
1691 //Get table name for later use in case the compressed file is to be verified
1692 string tableName = inFile.GetStr("EXTNAME");
1693
1694
1695 //and the header of the compressed file
1696 const fits::Table::Keys& header2 = inFile.GetKeys();
1697
1698 //get a non-compressed writer
1699 ofits reconstructedFile;
1700
1701 //figure out its name: /dev/null unless otherwise specified
1702 string reconstructedName = fileNameOut;
1703 reconstructedFile.open(reconstructedName.c_str(), false);
1704
1705 //reconstruct the original columns from the compressed file.
1706 string origChecksumStr;
1707 string origDatasum;
1708
1709 //reset tablename value so that it is re-read from compressed table's header
1710 tableName = "";
1711
1712 /************************************************************************************
1713 * Reconstruction setup done. Rebuild original header
1714 ************************************************************************************/
1715
1716 //re-tranlate the keys
1717 for (fits::Table::Keys::const_iterator it=header2.begin(); it!= header2.end(); it++)
1718 {
1719 string k = it->first;
1720 if (k == "XTENSION" || k == "BITPIX" || k == "PCOUNT" || k == "GCOUNT" ||
1721 k == "TFIELDS" || k == "ZTABLE" || k == "ZNAXIS1" || k == "ZNAXIS2" ||
1722 k == "ZHEAPPTR" || k == "ZPCOUNT" || k == "ZTILELEN" || k == "THEAP" ||
1723 k == "CHECKSUM" || k == "DATASUM" || k == "FCTCPVER" || k == "ZHEAP")
1724 {
1725 continue;
1726 }
1727
1728 if (k == "ZCHKSUM")
1729 {
1730 reconstructedFile.SetKeyComment("CHECKSUM", it->second.comment);
1731 origChecksumStr = it->second.value;
1732 continue;
1733 }
1734 if (k == "RAWSUM")
1735 {
1736 continue;
1737 }
1738
1739 if (k == "ZDTASUM")
1740 {
1741 reconstructedFile.SetKeyComment("DATASUM", it->second.comment);
1742 origDatasum = it->second.value;
1743 continue;
1744 }
1745
1746 if (k == "EXTNAME")
1747 {
1748 tableName = it->second.value;
1749 }
1750
1751 k = k.substr(0,5);
1752
1753 if (k == "TTYPE")
1754 {//we have an original column name here.
1755 //manually deal with these in order to preserve the ordering (easier than re-constructing yet another list on the fly)
1756 continue;
1757 }
1758
1759 if (k == "TFORM" || k == "NAXIS" || k == "ZCTYP" )
1760 {
1761 continue;
1762 }
1763
1764 if (k == "ZFORM" || k == "ZTYPE")
1765 {
1766 string tmpKey = it->second.fitsString;
1767 tmpKey[0] = 'T';
1768 reconstructedFile.SetKeyFromFitsString(tmpKey);
1769 continue;
1770 }
1771
1772 reconstructedFile.SetKeyFromFitsString(it->second.fitsString);
1773 }
1774
1775 if (tableName == "")
1776 {
1777 cout << "Error: table name from file " << fileNameOut << " could not be found. Aborting" << endl;
1778 return -1;
1779 }
1780
1781 //Restore the original columns
1782 for (uint32_t numCol=1; numCol<10000; numCol++)
1783 {
1784 ostringstream str;
1785 str << numCol;
1786 if (!inFile.HasKey("TTYPE"+str.str())) break;
1787
1788 string ttype = inFile.GetStr("TTYPE"+str.str());
1789 string tform = inFile.GetStr("ZFORM"+str.str());
1790 char type = tform[tform.size()-1];
1791 string number = tform.substr(0, tform.size()-1);
1792 int numElems = atoi(number.c_str());
1793
1794 if (number == "") numElems=1;
1795
1796 reconstructedFile.AddColumn(numElems, type, ttype, "", "", false);
1797 }
1798
1799 reconstructedFile.WriteTableHeader(tableName.c_str());
1800
1801 /************************************************************************************
1802 * Original header restored. Do the data
1803 ************************************************************************************/
1804
1805 //set pointers to the readout data to later be able to gather it to "buffer".
1806 readPointers.clear();
1807 readOffsets.clear();
1808 readElemSize.clear();
1809 readNumElems.clear();
1810 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1811 {
1812 readPointers.push_back(inFile.SetPtrAddress(it->first));
1813 readOffsets.push_back(it->second.offset);
1814 readElemSize.push_back(it->second.size);
1815 readNumElems.push_back(it->second.num);
1816 }
1817
1818 //do the actual reconstruction work
1819 uint32_t i=1;
1820 while (i<=inFile.GetNumRows() && inFile.GetNextRow())
1821 {
1822 int count=0;
1823 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end();it++)
1824 {
1825 memcpy(&buffer[readOffsets[count]], readPointers[count], readElemSize[count]*readNumElems[count]);
1826 count++;
1827 }
1828 if (displayText) cout << "\r Row " << i << flush;
1829 reconstructedFile.WriteRow(buffer, rowWidth);
1830 if (!reconstructedFile.good())
1831 {
1832 cout << "ERROR: no space left on device (probably)" << endl;
1833 return -1;
1834 }
1835 i++;
1836 }
1837
1838 if (displayText) cout << endl;
1839
1840 //close reconstruction input and output
1841// Do NOT close the verify file, otherwise data cannot be flushed to copy file
1842// verifyFile.close();
1843 if (!inFile.IsFileOk())
1844 cout << "ERROR: file checksums seems wrong" << endl;
1845
1846 if (!reconstructedFile.close())
1847 {
1848 cout << "ERROR: disk probably full..." <<endl;
1849 return -1;
1850 }
1851
1852 //get original and reconstructed checksum and datasum
1853 std::pair<string, int> origChecksum = make_pair(origChecksumStr, atoi(origDatasum.c_str()));
1854 std::pair<string, int> newChecksum = reconstructedFile.GetChecksumData();
1855
1856 //verify that no mistake was made
1857 if (origChecksum.second != newChecksum.second)
1858 {
1859 cout << "ERROR: datasums are NOT identical: " << (uint32_t)(origChecksum.second) << " vs " << (uint32_t)(newChecksum.second) << endl;
1860 return -1;
1861 }
1862 if (origChecksum.first != newChecksum.first)
1863 {
1864 cout << "WARNING: checksums are NOT Identical: " << origChecksum.first << " vs " << newChecksum.first << endl;
1865 }
1866 else
1867 {
1868 if (true) cout << "Ok" << endl;
1869 }
1870
1871 buffer = buffer-4;
1872 delete[] buffer;
1873 return 0;
1874}
1875
Note: See TracBrowser for help on using the repository browser.