source: branches/fscctrl_safety_limits/src/fitsDecompressor.cc@ 19166

Last change on this file since 19166 was 17675, checked in by lyard, 11 years ago
modified the fitsDecompressor code to take into account the refactoring of the fits classes, namely the introduction of FITS.h
File size: 77.6 KB
Line 
1/*
2 * fitsCompressor.cc
3 *
4 * Created on: May 7, 2013
5 * Author: lyard
6 */
7
8
9#include "Configuration.h"
10#include "../externals/factfits.h"
11#include "../externals/ofits.h"
12#include "../externals/checksum.h"
13
14#include <map>
15#include <fstream>
16#include <sstream>
17#include <iostream>
18
19
20using namespace std;
21
22class CompressedFitsFile
23{
24public:
25 class HeaderEntry
26 {
27 public:
28 /**
29 * Default constructor
30 */
31 HeaderEntry(): _key(""),
32 _value(""),
33 _comment(""),
34 _fitsString("")
35 {
36 }
37
38 /**
39 * Regular constructor.
40 * @param key the name of the keyword entry
41 * @param value its value
42 * @param comment an optionnal comment to be placed after the value
43 */
44 template<typename T>
45 HeaderEntry(const string& k,
46 const T& val,
47 const string& comm) : _key(k),
48 _value(""),
49 _comment(comm),
50 _fitsString("")
51 {
52 setValue(val);
53 }
54
55 /**
56 * From fits.h
57 */
58 string Trim(const string &str, char c=' ')
59 {
60 // Trim Both leading and trailing spaces
61 const size_t pstart = str.find_first_not_of(c); // Find the first character position after excluding leading blank spaces
62 const size_t pend = str.find_last_not_of(c); // Find the first character position from reverse af
63
64 // if all spaces or empty return an empty string
65 if (string::npos==pstart || string::npos==pend)
66 return string();
67
68 return str.substr(pstart, pend-pstart+1);
69 }
70 /**
71 * Constructor from the original fits entry
72 */
73 HeaderEntry(const string& line)
74 {
75 _fitsString = line.data();
76
77 //parse the line
78 _key = Trim(line.substr(0,8));
79 //COMMENT and/or HISTORY values
80 if (line.substr(8,2)!= "= ")
81 {
82 _value = "";
83 _comment = Trim(line.substr(10));
84 return;
85 }
86 string next=line.substr(10);
87 const size_t slash = next.find_first_of('/');
88 _value = Trim(Trim(Trim(next.substr(0, slash)), '\''));
89 _comment = Trim(next.substr(slash+1));
90 }
91 /**
92 * Alternative constroctor from the fits entry
93 */
94 HeaderEntry(const vector<char>& lineVec)
95 {
96 HeaderEntry(string(lineVec.data()));
97 }
98 /**
99 * Destructor
100 */
101 virtual ~HeaderEntry(){}
102
103 const string& value() const {return _value;}
104 const string& key() const {return _key;}
105 const string& comment() const {return _comment;}
106 const string& fitsString() const {return _fitsString;}
107
108 /**
109 * Set a keyword value.
110 * @param value The value to be set
111 * @param update whether the value already exist or not. To be modified soon.
112 */
113 template<typename T>
114 void setValue(const T& val)
115 {
116 ostringstream str;
117 str << val;
118 _value = str.str();
119 buildFitsString();
120 };
121 /**
122 * Set the comment for a given entry
123 */
124 void setComment(const string& comm)
125 {
126 _comment = comm;
127 buildFitsString();
128 }
129
130 private:
131 /**
132 * Construct the FITS header string from the key, value and comment
133 */
134 void buildFitsString()
135 {
136 ostringstream str;
137 unsigned int totSize = 0;
138
139 // Tuncate the key if required
140 if (_key.length() > 8)
141 {
142 str << _key.substr(0, 8);
143 totSize += 8;
144 }
145 else
146 {
147 str << _key;
148 totSize += _key.length();
149 }
150
151 // Append space if key is less than 8 chars long
152 for (int i=totSize; i<8;i++)
153 {
154 str << " ";
155 totSize++;
156 }
157
158 // Add separator
159 str << "= ";
160 totSize += 2;
161
162 // Format value
163 if (_value.length() < 20)
164 for (;totSize<30-_value.length();totSize++)
165 str << " ";
166
167 if (_value.length() > 70)
168 {
169 str << _value.substr(0,70);
170 totSize += 70;
171 }
172 else
173 {
174 str << _value;
175 totSize += _value.size();
176 }
177
178 // If there is space remaining, add comment area
179 if (totSize < 77)
180 {
181 str << " / ";
182 totSize += 3;
183 if (totSize < 80)
184 {
185 unsigned int commentSize = 80 - totSize;
186 if (_comment.length() > commentSize)
187 {
188 str << _comment.substr(0,commentSize);
189 totSize += commentSize;
190 }
191 else
192 {
193 str << _comment;
194 totSize += _comment.length();
195 }
196 }
197 }
198
199 // If there is yet some free space, fill up the entry with spaces
200 for (int i=totSize; i<80;i++)
201 str << " ";
202
203 _fitsString = str.str();
204
205 // Check for correct completion
206 if (_fitsString.length() != 80)
207 cout << "Error |" << _fitsString << "| is not of length 80" << endl;
208 }
209
210 string _key; ///< the key (name) of the header entry
211 string _value; ///< the value of the header entry
212 string _comment; ///< the comment associated to the header entry
213 string _fitsString; ///< the string that will be written to the fits file
214 };
215
216 /**
217 * Supported compressions
218 */
219 typedef enum
220 {
221 UNCOMPRESSED,
222 SMOOTHMAN
223 } FitsCompression;
224
225 /**
226 * Columns class
227 */
228 class ColumnEntry
229 {
230 public:
231 /**
232 * Default constructor
233 */
234 ColumnEntry();
235
236 /**
237 * Default destructor
238 */
239 virtual ~ColumnEntry(){}
240
241 /**
242 * Constructor from values
243 * @param n the column name
244 * @param t the column type
245 * @param numof the number of entries in the column
246 * @param comp the compression type for this column
247 */
248 ColumnEntry(const string& n,
249 char t,
250 int numOf,
251 BlockHeader& head,
252 vector<uint16_t>& seq) : _name(n),
253 _num(numOf),
254 _typeSize(0),
255 _offset(0),
256 _type(t),
257 _description(""),
258 _header(head),
259 _compSequence(seq)
260 {
261 switch (t)
262 {
263 case 'L':
264 case 'A':
265 case 'B': _typeSize = 1; break;
266 case 'I': _typeSize = 2; break;
267 case 'J':
268 case 'E': _typeSize = 4; break;
269 case 'K':
270 case 'D': _typeSize = 8; break;
271 default:
272 cout << "Error: typename " << t << " missing in the current implementation" << endl;
273 };
274
275 ostringstream str;
276 str << "data format of field: ";
277
278 switch (t)
279 {
280 case 'L': str << "1-byte BOOL"; break;
281 case 'A': str << "1-byte CHAR"; break;
282 case 'B': str << "BYTE"; break;
283 case 'I': str << "2-byte INTEGER"; break;
284 case 'J': str << "4-byte INTEGER"; break;
285 case 'K': str << "8-byte INTEGER"; break;
286 case 'E': str << "4-byte FLOAT"; break;
287 case 'D': str << "8-byte FLOAT"; break;
288 }
289
290 _description = str.str();
291 }
292
293 const string& name() const { return _name;};
294 int width() const { return _num*_typeSize;};
295 int offset() const { return _offset; };
296 int numElems() const { return _num; };
297 int sizeOfElems() const { return _typeSize;};
298 void setOffset(int off) { _offset = off;};
299 char type() const { return _type;};
300 string getDescription() const { return _description;}
301 BlockHeader& getBlockHeader() { return _header;}
302 const vector<uint16_t>& getCompressionSequence() const { return _compSequence;}
303 const char& getColumnOrdering() const { return _header.ordering;}
304
305
306 string getCompressionString() const
307 {
308 return "FACT";
309 /*
310 ostringstream str;
311 for (uint32_t i=0;i<_compSequence.size();i++)
312 switch (_compSequence[i])
313 {
314 case FACT_RAW: if (str.str().size() == 0) str << "RAW"; break;
315 case FACT_SMOOTHING: str << "SMOOTHING "; break;
316 case FACT_HUFFMAN16: str << "HUFFMAN16 "; break;
317 };
318 return str.str();*/
319 }
320
321 private:
322
323 string _name; ///< name of the column
324 int _num; ///< number of elements contained in one row of this column
325 int _typeSize; ///< the number of bytes taken by one element
326 int _offset; ///< the offset of the column, in bytes, from the beginning of one row
327 char _type; ///< the type of the column, as specified by the fits documentation
328 string _description; ///< a description for the column. It will be placed in the header
329 BlockHeader _header;
330 vector<uint16_t> _compSequence;
331 };
332
333 public:
334 ///@brief default constructor. Assigns a default number of rows and tiles
335 CompressedFitsFile(uint32_t numTiles=100, uint32_t numRowsPerTile=100);
336
337 ///@brief default destructor
338 virtual ~CompressedFitsFile();
339
340 ///@brief get the header of the file
341 vector<HeaderEntry>& getHeaderEntries() { return _header;}
342
343 protected:
344 ///@brief protected function to allocate the intermediate buffers
345 bool reallocateBuffers();
346
347 //FITS related stuff
348 vector<HeaderEntry> _header; ///< Header keys
349 vector<ColumnEntry> _columns; ///< Columns in the file
350 uint32_t _numTiles; ///< Number of tiles (i.e. groups of rows)
351 uint32_t _numRowsPerTile; ///< Number of rows per tile
352 uint32_t _totalNumRows; ///< Total number of raws
353 uint32_t _rowWidth; ///< Total number of bytes in one row
354 bool _headerFlushed; ///< Flag telling whether the header record is synchronized with the data on disk
355 char* _buffer; ///< Memory buffer to store rows while they are not compressed
356 Checksum _checksum; ///< Checksum for asserting the consistency of the data
357 fstream _file; ///< The actual file streamer for accessing disk data
358
359 //compression related stuff
360 typedef pair<int64_t, int64_t> CatalogEntry;
361 typedef vector<CatalogEntry> CatalogRow;
362 typedef vector<CatalogRow> CatalogType;
363 CatalogType _catalog; ///< Catalog, i.e. the main table that points to the compressed data.
364 uint64_t _heapPtr; ///< the address in the file of the heap area
365 vector<char*> _transposedBuffer; ///< Memory buffer to store rows while they are transposed
366 vector<char*> _compressedBuffer; ///< Memory buffer to store rows while they are compressed
367
368 //thread related stuff
369 uint32_t _numThreads; ///< The number of threads that will be used to compress
370 uint32_t _threadIndex; ///< A variable to assign threads indices
371 vector<pthread_t> _thread; ///< The thread handler of the compressor
372 vector<uint32_t> _threadNumRows; ///< Total number of rows for thread to compress
373 vector<uint32_t> _threadStatus; ///< Flag telling whether the buffer to be transposed (and compressed) is full or empty
374
375 //thread states. Not all used, but they do not hurt
376 static const uint32_t _THREAD_WAIT_; ///< Thread doing nothing
377 static const uint32_t _THREAD_COMPRESS_; ///< Thread working, compressing
378 static const uint32_t _THREAD_DECOMPRESS_; ///< Thread working, decompressing
379 static const uint32_t _THREAD_WRITE_; ///< Thread writing data to disk
380 static const uint32_t _THREAD_READ_; ///< Thread reading data from disk
381 static const uint32_t _THREAD_EXIT_; ///< Thread exiting
382
383 static HeaderEntry _dummyHeaderEntry; ///< Dummy entry for returning if requested on is not found
384};
385
386class CompressedFitsWriter : public CompressedFitsFile
387{
388 public:
389 ///@brief Default constructor. 100 tiles of 100 rows each are assigned by default
390 CompressedFitsWriter(uint32_t numTiles=100, uint32_t numRowsPerTile=100);
391
392 ///@brief default destructor
393 virtual ~CompressedFitsWriter();
394
395 ///@brief add one column to the file
396 bool addColumn(const ColumnEntry& column);
397
398 ///@brief sets a given header key
399 bool setHeaderKey(const HeaderEntry&);
400
401 bool changeHeaderKey(const string& origName, const string& newName);
402
403 ///@brief open a new fits file
404 bool open(const string& fileName, const string& tableName="Data");
405
406 ///@brief close the opened file
407 bool close();
408
409 ///@brief write one row of data, already placed in bufferToWrite. Does the byte-swapping
410 bool writeBinaryRow(const char* bufferToWrite);
411
412 uint32_t getRowWidth();
413
414 ///@brief assign a given (already loaded) drs calibration
415 void setDrsCalib(int16_t* data);
416
417 ///@brief set the number of worker threads compressing the data.
418 bool setNumWorkingThreads(uint32_t num);
419
420 private:
421 ///@brief compresses one buffer of data, the one given by threadIndex
422 uint64_t compressBuffer(uint32_t threadIndex);
423
424 ///@brief writes an already compressed buffer to disk
425 bool writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite);
426
427 ///@brief add the header checksum to the datasum
428 void addHeaderChecksum(Checksum& checksum);
429
430 ///@brief write the header. If closingFile is set to true, checksum is calculated
431 void writeHeader(bool closingFile = false);
432
433 ///@brief write the compressed data catalog. If closingFile is set to true, checksum is calculated
434 void writeCatalog(bool closingFile=false);
435
436 /// FIXME this was a bad idea. Move everything to the regular header
437 vector<HeaderEntry> _defaultHeader;
438
439 /// the main function compressing the data
440 static void* threadFunction(void* context);
441
442 /// Write the drs calibration to disk, if any
443 void writeDrsCalib();
444
445 /// Copy and transpose (or semi-transpose) one tile of data
446 void copyTransposeTile(uint32_t index);
447
448 /// Specific compression functions
449 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
450 uint32_t compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
451 uint32_t compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
452 uint32_t applySMOOTHING(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
453
454 int32_t _checkOffset; ///< offset to the data pointer to calculate the checksum
455 int16_t* _drsCalibData; ///< array of the Drs baseline mean
456 int32_t _threadLooper; ///< Which thread will deal with the upcoming bunch of data ?
457 pthread_mutex_t _mutex; ///< mutex for compressing threads
458
459
460 static string _emptyBlock; ///< an empty block to be apened at the end of a file so that its length is a multiple of 2880
461 static string _fitsHeader; ///< the default header to be written in every fits file
462};
463
464const uint32_t CompressedFitsFile::_THREAD_WAIT_ = 0;
465const uint32_t CompressedFitsFile::_THREAD_COMPRESS_ = 1;
466const uint32_t CompressedFitsFile::_THREAD_DECOMPRESS_= 2;
467const uint32_t CompressedFitsFile::_THREAD_WRITE_ = 3;
468const uint32_t CompressedFitsFile::_THREAD_READ_ = 4;
469const uint32_t CompressedFitsFile::_THREAD_EXIT_ = 5;
470
471template<>
472void CompressedFitsFile::HeaderEntry::setValue(const string& v)
473{
474 string val = v;
475 if (val.size() > 2 && val[0] == '\'')
476 {
477 size_t pos = val.find_last_of("'");
478 if (pos != string::npos && pos != 0)
479 val = val.substr(1, pos-1);
480 }
481 ostringstream str;
482
483 str << "'" << val << "'";
484 for (int i=str.str().length(); i<20;i++)
485 str << " ";
486 _value = str.str();
487 buildFitsString();
488}
489
490/**
491 * Default header to be written in all fits files
492 */
493string CompressedFitsWriter::_fitsHeader = "SIMPLE = T / file does conform to FITS standard "
494 "BITPIX = 8 / number of bits per data pixel "
495 "NAXIS = 0 / number of data axes "
496 "EXTEND = T / FITS dataset may contain extensions "
497 "CHECKSUM= '4AcB48bA4AbA45bA' / Checksum for the whole HDU "
498 "DATASUM = ' 0' / Checksum for the data block "
499 "COMMENT FITS (Flexible Image Transport System) format is defined in 'Astronomy"
500 "COMMENT and Astrophysics', volume 376, page 359; bibcode: 2001A&A...376..359H "
501 "END "
502 " "
503 " "
504 " "
505 " "
506 " "
507 " "
508 " "
509 " "
510 " "
511 " "
512 " "
513 " "
514 " "
515 " "
516 " "
517 " "
518 " "
519 " "
520 " "
521 " "
522 " "
523 " "
524 " "
525 " "
526 " "
527 " "
528 " ";
529
530/**
531 * Empty block to be appenned at the end of files, so that the length matches multiple of 2880 bytes
532 *
533 */
534string CompressedFitsWriter::_emptyBlock = " "
535 " "
536 " "
537 " "
538 " "
539 " "
540 " "
541 " "
542 " "
543 " "
544 " "
545 " "
546 " "
547 " "
548 " "
549 " "
550 " "
551 " "
552 " "
553 " "
554 " "
555 " "
556 " "
557 " "
558 " "
559 " "
560 " "
561 " "
562 " "
563 " "
564 " "
565 " "
566 " "
567 " "
568 " "
569 " ";
570
571CompressedFitsFile::HeaderEntry CompressedFitsFile::_dummyHeaderEntry;
572/****************************************************************
573 * SUPER CLASS DEFAULT CONSTRUCTOR
574 ****************************************************************/
575CompressedFitsFile::CompressedFitsFile(uint32_t numTiles,
576 uint32_t numRowsPerTile) : _header(),
577 _columns(),
578 _numTiles(numTiles),
579 _numRowsPerTile(numRowsPerTile),
580 _totalNumRows(0),
581 _rowWidth(0),
582 _headerFlushed(false),
583 _buffer(NULL),
584 _checksum(0),
585 _heapPtr(0),
586 _transposedBuffer(1),
587 _compressedBuffer(1),
588 _numThreads(1),
589 _threadIndex(0),
590 _thread(1),
591 _threadNumRows(1),
592 _threadStatus(1)
593{
594 _catalog.resize(_numTiles);
595 _transposedBuffer[0] = NULL;
596 _compressedBuffer[0] = NULL;
597 _threadStatus[0] = _THREAD_WAIT_;
598 _threadNumRows[0] = 0;
599}
600
601/****************************************************************
602 * SUPER CLASS DEFAULT DESTRUCTOR
603 ****************************************************************/
604CompressedFitsFile::~CompressedFitsFile()
605{
606 if (_buffer != NULL)
607 {
608 _buffer = _buffer-4;
609 delete[] _buffer;
610 _buffer = NULL;
611 for (uint32_t i=0;i<_numThreads;i++)
612 {
613 _compressedBuffer[i] = _compressedBuffer[i]-4;
614 delete[] _transposedBuffer[i];
615 delete[] _compressedBuffer[i];
616 _transposedBuffer[i] = NULL;
617 _compressedBuffer[i] = NULL;
618 }
619 }
620 if (_file.is_open())
621 _file.close();
622}
623
624/****************************************************************
625 * REALLOCATE BUFFER
626 ****************************************************************/
627bool CompressedFitsFile::reallocateBuffers()
628{
629 if (_buffer)
630 {
631 _buffer = _buffer - 4;
632 delete[] _buffer;
633 for (uint32_t i=0;i<_compressedBuffer.size();i++)
634 {
635 _compressedBuffer[i] = _compressedBuffer[i]-4;
636 delete[] _transposedBuffer[i];
637 delete[] _compressedBuffer[i];
638 }
639 }
640 _buffer = new char[_rowWidth*_numRowsPerTile + 12];
641 if (_buffer == NULL) return false;
642 memset(_buffer, 0, 4);
643 _buffer = _buffer + 4;
644 if (_compressedBuffer.size() != _numThreads)
645 {
646 _transposedBuffer.resize(_numThreads);
647 _compressedBuffer.resize(_numThreads);
648 }
649 for (uint32_t i=0;i<_numThreads;i++)
650 {
651 _transposedBuffer[i] = new char[_rowWidth*_numRowsPerTile];
652 _compressedBuffer[i] = new char[_rowWidth*_numRowsPerTile + _columns.size() + sizeof(TileHeader) + 12]; //use a bit more memory for compression flags and checksumming
653 if (_transposedBuffer[i] == NULL || _compressedBuffer[i] == NULL)
654 return false;
655 //shift the compressed buffer by 4 bytes, for checksum calculation
656 memset(_compressedBuffer[i], 0, 4);
657 _compressedBuffer[i] = _compressedBuffer[i]+4;
658 //initialize the tile header
659 TileHeader tileHeader;
660 memcpy(_compressedBuffer[i], &tileHeader, sizeof(TileHeader));
661 }
662 return true;
663}
664
665/****************************************************************
666 * DEFAULT WRITER CONSTRUCTOR
667 ****************************************************************/
668CompressedFitsWriter::CompressedFitsWriter(uint32_t numTiles,
669 uint32_t numRowsPerTile) : CompressedFitsFile(numTiles, numRowsPerTile),
670 _checkOffset(0),
671 _drsCalibData(NULL),
672 _threadLooper(0)
673{
674 _defaultHeader.push_back(HeaderEntry("XTENSION", "'BINTABLE' ", "binary table extension"));
675 _defaultHeader.push_back(HeaderEntry("BITPIX", 8, "8-bit bytes"));
676 _defaultHeader.push_back(HeaderEntry("NAXIS", 2, "2-dimensional binary table"));
677 _defaultHeader.push_back(HeaderEntry("NAXIS1", _rowWidth, "width of table in bytes"));
678 _defaultHeader.push_back(HeaderEntry("NAXIS2", numTiles, "num of rows in table"));
679 _defaultHeader.push_back(HeaderEntry("PCOUNT", 0, "size of special data area"));
680 _defaultHeader.push_back(HeaderEntry("GCOUNT", 1, "one data group (required keyword)"));
681 _defaultHeader.push_back(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
682 _defaultHeader.push_back(HeaderEntry("CHECKSUM", "'0000000000000000' ", "Checksum for the whole HDU"));
683 _defaultHeader.push_back(HeaderEntry("DATASUM", " 0", "Checksum for the data block"));
684 //compression stuff
685 _defaultHeader.push_back(HeaderEntry("ZTABLE", "T", "Table is compressed"));
686 _defaultHeader.push_back(HeaderEntry("ZNAXIS1", 0, "Width of uncompressed rows"));
687 _defaultHeader.push_back(HeaderEntry("ZNAXIS2", 0, "Number of uncompressed rows"));
688 _defaultHeader.push_back(HeaderEntry("ZPCOUNT", 0, ""));
689 _defaultHeader.push_back(HeaderEntry("ZHEAPPTR", 0, ""));
690 _defaultHeader.push_back(HeaderEntry("ZTILELEN", numRowsPerTile, "Number of rows per tile"));
691 _defaultHeader.push_back(HeaderEntry("THEAP", 0, ""));
692
693 pthread_mutex_init(&_mutex, NULL);
694}
695
696/****************************************************************
697 * DEFAULT DESTRUCTOR
698 ****************************************************************/
699CompressedFitsWriter::~CompressedFitsWriter()
700{
701 pthread_mutex_destroy(&_mutex);
702}
703
704/****************************************************************
705 * SET THE POINTER TO THE DRS CALIBRATION
706 ****************************************************************/
707void CompressedFitsWriter::setDrsCalib(int16_t* data)
708{
709 _drsCalibData = data;
710}
711
712/****************************************************************
713 * SET NUM WORKING THREADS
714 ****************************************************************/
715bool CompressedFitsWriter::setNumWorkingThreads(uint32_t num)
716{
717 if (_file.is_open())
718 return false;
719 if (num < 1 || num > 64)
720 {
721 cout << "ERROR: num threads must be between 1 and 64. Ignoring" << endl;
722 return false;
723 }
724 _numThreads = num;
725 _transposedBuffer[0] = NULL;
726 _compressedBuffer[0] = NULL;
727 _threadStatus.resize(num);
728 _thread.resize(num);
729 _threadNumRows.resize(num);
730 for (uint32_t i=0;i<num;i++)
731 {
732 _threadNumRows[i] = 0;
733 _threadStatus[i] = _THREAD_WAIT_;
734 }
735 return reallocateBuffers();
736}
737
738/****************************************************************
739 * WRITE DRS CALIBRATION TO FILE
740 ****************************************************************/
741void CompressedFitsWriter::writeDrsCalib()
742{
743 //if file was not loaded, ignore
744 if (_drsCalibData == NULL)
745 return;
746 uint64_t whereDidIStart = _file.tellp();
747 vector<HeaderEntry> header;
748 header.push_back(HeaderEntry("XTENSION", "'BINTABLE' ", "binary table extension"));
749 header.push_back(HeaderEntry("BITPIX" , 8 , "8-bit bytes"));
750 header.push_back(HeaderEntry("NAXIS" , 2 , "2-dimensional binary table"));
751 header.push_back(HeaderEntry("NAXIS1" , 1024*1440*2 , "width of table in bytes"));
752 header.push_back(HeaderEntry("NAXIS2" , 1 , "number of rows in table"));
753 header.push_back(HeaderEntry("PCOUNT" , 0 , "size of special data area"));
754 header.push_back(HeaderEntry("GCOUNT" , 1 , "one data group (required keyword)"));
755 header.push_back(HeaderEntry("TFIELDS" , 1 , "number of fields in each row"));
756 header.push_back(HeaderEntry("CHECKSUM", "'0000000000000000' ", "Checksum for the whole HDU"));
757 header.push_back(HeaderEntry("DATASUM" , " 0" , "Checksum for the data block"));
758 header.push_back(HeaderEntry("EXTNAME" , "'ZDrsCellOffsets' ", "name of this binary table extension"));
759 header.push_back(HeaderEntry("TTYPE1" , "'OffsetCalibration' ", "label for field 1"));
760 header.push_back(HeaderEntry("TFORM1" , "'1474560I' ", "data format of field: 2-byte INTEGER"));
761
762 for (uint32_t i=0;i<header.size();i++)
763 _file.write(header[i].fitsString().c_str(), 80);
764 //End the header
765 _file.write("END ", 80);
766 long here = _file.tellp();
767 if (here%2880)
768 _file.write(_emptyBlock.c_str(), 2880 - here%2880);
769 //now write the data itself
770 int16_t* swappedBytes = new int16_t[1024];
771 Checksum checksum;
772 for (int32_t i=0;i<1440;i++)
773 {
774 memcpy(swappedBytes, &(_drsCalibData[i*1024]), 2048);
775 for (int32_t j=0;j<2048;j+=2)
776 {
777 int8_t inter;
778 inter = reinterpret_cast<int8_t*>(swappedBytes)[j];
779 reinterpret_cast<int8_t*>(swappedBytes)[j] = reinterpret_cast<int8_t*>(swappedBytes)[j+1];
780 reinterpret_cast<int8_t*>(swappedBytes)[j+1] = inter;
781 }
782 _file.write(reinterpret_cast<char*>(swappedBytes), 2048);
783 checksum.add(reinterpret_cast<char*>(swappedBytes), 2048);
784 }
785 uint64_t whereDidIStop = _file.tellp();
786 delete[] swappedBytes;
787 //No need to pad the data, as (1440*1024*2)%2880==0
788
789 //calculate the checksum from the header
790 ostringstream str;
791 str << checksum.val();
792 header[9] = HeaderEntry("DATASUM", str.str(), "Checksum for the data block");
793 for (vector<HeaderEntry>::iterator it=header.begin();it!=header.end(); it++)
794 checksum.add(it->fitsString().c_str(), 80);
795 string end("END ");
796 string space(" ");
797 checksum.add(end.c_str(), 80);
798 int headerRowsLeft = 36 - (header.size() + 1)%36;
799 for (int i=0;i<headerRowsLeft;i++)
800 checksum.add(space.c_str(), 80);
801 //udpate the checksum keyword
802 header[8] = HeaderEntry("CHECKSUM", checksum.str(), "Checksum for the whole HDU");
803 //and eventually re-write the header data
804 _file.seekp(whereDidIStart);
805 for (uint32_t i=0;i<header.size();i++)
806 _file.write(header[i].fitsString().c_str(), 80);
807 _file.seekp(whereDidIStop);
808}
809
810/****************************************************************
811 * ADD COLUMN
812 ****************************************************************/
813bool CompressedFitsWriter::addColumn(const ColumnEntry& column)
814{
815 if (_totalNumRows != 0)
816 {
817 cout << "Error: cannot add new columns once first row has been written" << endl;
818 return false;
819 }
820 for (vector<ColumnEntry>::iterator it=_columns.begin(); it != _columns.end(); it++)
821 {
822 if (it->name() == column.name())
823 {
824 cout << "Warning: column already exist (" << column.name() << "). Ignoring" << endl;
825 return false;
826 }
827 }
828 _columns.push_back(column);
829 _columns.back().setOffset(_rowWidth);
830 _rowWidth += column.width();
831 reallocateBuffers();
832
833 ostringstream str, str2, str3;
834 str << "TTYPE" << _columns.size();
835 str2 << column.name();
836 str3 << "label for field ";
837 if (_columns.size() < 10) str3 << " ";
838 if (_columns.size() < 100) str3 << " ";
839 str3 << _columns.size();
840 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
841 str.str("");
842 str2.str("");
843 str3.str("");
844 str << "TFORM" << _columns.size();
845 str2 << "1QB";
846 str3 << "data format of field " << _columns.size();
847 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
848 str.str("");
849 str2.str("");
850 str3.str("");
851 str << "ZFORM" << _columns.size();
852 str2 << column.numElems() << column.type();
853 str3 << "Original format of field " << _columns.size();
854 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
855 str.str("");
856 str2.str("");
857 str3.str("");
858 str << "ZCTYP" << _columns.size();
859 str2 << column.getCompressionString();
860 str3 << "Comp. Scheme of field " << _columns.size();
861 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
862 //resize the catalog vector accordingly
863 for (uint32_t i=0;i<_numTiles;i++)
864 {
865 _catalog[i].resize(_columns.size());
866 for (uint32_t j=0;j<_catalog[i].size();j++)
867 _catalog[i][j] = make_pair(0,0);
868 }
869 return true;
870}
871
872/****************************************************************
873 * SET HEADER KEY
874 ****************************************************************/
875bool CompressedFitsWriter::setHeaderKey(const HeaderEntry& entry)
876{
877 HeaderEntry ent = entry;
878 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
879 {
880 if (it->key() == entry.key())
881 {
882 if (entry.comment() == "")
883 ent.setComment(it->comment());
884 (*it) = ent;
885 _headerFlushed = false;
886 return true;
887 }
888 }
889 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
890 {
891 if (it->key() == entry.key())
892 {
893 if (entry.comment() == "")
894 ent.setComment(it->comment());
895 (*it) = ent;
896 _headerFlushed = false;
897 return true;
898 }
899 }
900 if (_totalNumRows != 0)
901 {
902 cout << "Error: new header keys (" << entry.key() << ") must be set before the first row is written. Ignoring." << endl;
903 return false;
904 }
905 _header.push_back(entry);
906 _headerFlushed = false;
907 return true;
908}
909
910bool CompressedFitsWriter::changeHeaderKey(const string& origName, const string& newName)
911{
912 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
913 {
914 if (it->key() == origName)
915 {
916 (*it) = HeaderEntry(newName, it->value(), it->comment());
917 _headerFlushed = false;
918 return true;
919 }
920 }
921 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
922 {
923 if (it->key() == origName)
924 {
925 (*it) = HeaderEntry(newName, it->value(), it->comment());
926 _headerFlushed = false;
927 return true;
928 }
929 }
930 return false;
931}
932/****************************************************************
933 * OPEN
934 ****************************************************************/
935bool CompressedFitsWriter::open(const string& fileName, const string& tableName)
936{
937 _file.open(fileName.c_str(), ios_base::out);
938 if (!_file.is_open())
939 {
940 cout << "Error: Could not open the file (" << fileName << ")." << endl;
941 return false;
942 }
943 _defaultHeader.push_back(HeaderEntry("EXTNAME", tableName, "name of this binary table extension"));
944 _headerFlushed = false;
945 _threadIndex = 0;
946 //create requested number of threads
947 for (uint32_t i=0;i<_numThreads;i++)
948 pthread_create(&(_thread[i]), NULL, threadFunction, this);
949 //wait for the threads to start
950 while (_numThreads != _threadIndex)
951 usleep(1000);
952 //set the writing fence to the last thread
953 _threadIndex = _numThreads-1;
954 return (_file.good());
955}
956
957/****************************************************************
958 * WRITE HEADER
959 ****************************************************************/
960void CompressedFitsWriter::writeHeader(bool closingFile)
961{
962 if (_headerFlushed)
963 return;
964 if (!_file.is_open())
965 return;
966
967 long cPos = _file.tellp();
968
969 _file.seekp(0);
970
971 _file.write(_fitsHeader.c_str(), 2880);
972
973 //Write the DRS calib table here !
974 writeDrsCalib();
975
976 //we are now at the beginning of the main table. Write its header
977 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
978 _file.write(it->fitsString().c_str(), 80);
979
980 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
981 _file.write(it->fitsString().c_str(), 80);
982
983 _file.write("END ", 80);
984 long here = _file.tellp();
985 if (here%2880)
986 _file.write(_emptyBlock.c_str(), 2880 - here%2880);
987
988 _headerFlushed = true;
989
990 here = _file.tellp();
991
992 if (here%2880)
993 cout << "Error: seems that header did not finish at the end of a block." << endl;
994
995 if (here > cPos && cPos != 0)
996 {
997 cout << "Error, entries were added after the first row was written. This is not supposed to happen." << endl;
998 return;
999 }
1000
1001 here = _file.tellp();
1002 writeCatalog(closingFile);
1003
1004 here = _file.tellp() - here;
1005 _heapPtr = here;
1006
1007 if (cPos != 0)
1008 _file.seekp(cPos);
1009}
1010
1011/****************************************************************
1012 * WRITE CATALOG
1013 * WARNING: writeCatalog is only meant to be used by writeHeader.
1014 * external usage will most likely corrupt the file
1015 ****************************************************************/
1016void CompressedFitsWriter::writeCatalog(bool closingFile)
1017{
1018 uint32_t sizeWritten = 0;
1019 for (uint32_t i=0;i<_catalog.size();i++)
1020 {
1021 for (uint32_t j=0;j<_catalog[i].size();j++)
1022 {
1023 //swap the bytes
1024 int8_t swappedEntry[16];
1025 swappedEntry[0] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[7];
1026 swappedEntry[1] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[6];
1027 swappedEntry[2] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[5];
1028 swappedEntry[3] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[4];
1029 swappedEntry[4] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[3];
1030 swappedEntry[5] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[2];
1031 swappedEntry[6] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[1];
1032 swappedEntry[7] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[0];
1033
1034 swappedEntry[8] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[7];
1035 swappedEntry[9] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[6];
1036 swappedEntry[10] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[5];
1037 swappedEntry[11] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[4];
1038 swappedEntry[12] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[3];
1039 swappedEntry[13] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[2];
1040 swappedEntry[14] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[1];
1041 swappedEntry[15] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[0];
1042 if (closingFile)
1043 {
1044 _checksum.add(reinterpret_cast<char*>(swappedEntry), 16);
1045 }
1046 _file.write(reinterpret_cast<char*>(&swappedEntry[0]), 2*sizeof(int64_t));
1047 sizeWritten += 2*sizeof(int64_t);
1048 }
1049 }
1050
1051 //we do not reserve space for now because fverify does not like that.
1052 //TODO bug should be fixed in the new version. Install it on the cluster and restor space reservation
1053 return ;
1054
1055 //write the padding so that the HEAP section starts at a 2880 bytes boundary
1056 if (sizeWritten % 2880 != 0)
1057 {
1058 vector<char> nullVec(2880 - sizeWritten%2880, 0);
1059 _file.write(nullVec.data(), 2880 - sizeWritten%2880);
1060 }
1061}
1062
1063/****************************************************************
1064 * ADD HEADER CHECKSUM
1065 ****************************************************************/
1066void CompressedFitsWriter::addHeaderChecksum(Checksum& checksum)
1067{
1068 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin();it!=_defaultHeader.end(); it++)
1069 _checksum.add(it->fitsString().c_str(), 80);
1070 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
1071 _checksum.add(it->fitsString().c_str(), 80);
1072 string end("END ");
1073 string space(" ");
1074 checksum.add(end.c_str(), 80);
1075 int headerRowsLeft = 36 - (_defaultHeader.size() + _header.size() + 1)%36;
1076 for (int i=0;i<headerRowsLeft;i++)
1077 checksum.add(space.c_str(), 80);
1078}
1079
1080/****************************************************************
1081 * CLOSE
1082 ****************************************************************/
1083bool CompressedFitsWriter::close()
1084{
1085 for (uint32_t i=0;i<_numThreads;i++)
1086 while (_threadStatus[i] != _THREAD_WAIT_)
1087 usleep(100000);
1088 for (uint32_t i=0;i<_numThreads;i++)
1089 _threadStatus[i] = _THREAD_EXIT_;
1090 for (uint32_t i=0;i<_numThreads;i++)
1091 pthread_join(_thread[i], NULL);
1092 //flush the rows that were not written yet
1093 if (_totalNumRows%_numRowsPerTile != 0)
1094 {
1095 copyTransposeTile(0);
1096
1097 _threadNumRows[0] = _totalNumRows;
1098 uint32_t numBytes = compressBuffer(0);
1099 writeCompressedDataToDisk(0, numBytes);
1100 }
1101 //compression stuff
1102 setHeaderKey(HeaderEntry("ZNAXIS1", _rowWidth, "Width of uncompressed rows"));
1103 setHeaderKey(HeaderEntry("ZNAXIS2", _totalNumRows, "Number of uncompressed rows"));
1104 //TODO calculate the real offset from the main table to the start of the HEAP data area
1105 setHeaderKey(HeaderEntry("ZHEAPPTR", _heapPtr, ""));
1106 setHeaderKey(HeaderEntry("THEAP", _heapPtr, ""));
1107
1108 //regular stuff
1109 if (_catalog.size() > 0)
1110 {
1111 setHeaderKey(HeaderEntry("NAXIS1", 2*sizeof(int64_t)*_catalog[0].size(), "width of table in bytes"));
1112 setHeaderKey(HeaderEntry("NAXIS2", _numTiles, ""));
1113 setHeaderKey(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
1114 int64_t heapSize = 0;
1115 int64_t compressedOffset = 0;
1116 for (uint32_t i=0;i<_catalog.size();i++)
1117 {
1118 compressedOffset += sizeof(TileHeader);
1119 heapSize += sizeof(TileHeader);
1120 for (uint32_t j=0;j<_catalog[i].size();j++)
1121 {
1122 heapSize += _catalog[i][j].first;
1123// cout << "heapSize: " << heapSize << endl;
1124 //set the catalog offsets to their actual values
1125 _catalog[i][j].second = compressedOffset;
1126 compressedOffset += _catalog[i][j].first;
1127 //special case if entry has zero length
1128 if (_catalog[i][j].first == 0) _catalog[i][j].second = 0;
1129 }
1130 }
1131 setHeaderKey(HeaderEntry("PCOUNT", heapSize, "size of special data area"));
1132 }
1133 else
1134 {
1135 setHeaderKey(HeaderEntry("NAXIS1", _columns.size()*2*sizeof(int64_t), "width of table in bytes"));
1136 setHeaderKey(HeaderEntry("NAXIS2", 0, ""));
1137 setHeaderKey(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
1138 setHeaderKey(HeaderEntry("PCOUNT", 0, "size of special data area"));
1139 changeHeaderKey("THEAP", "ZHEAP");
1140 }
1141 ostringstream str;
1142
1143 writeHeader(true);
1144
1145 str.str("");
1146 str << _checksum.val();
1147
1148 setHeaderKey(HeaderEntry("DATASUM", str.str(), ""));
1149 addHeaderChecksum(_checksum);
1150 setHeaderKey(HeaderEntry("CHECKSUM", _checksum.str(), ""));
1151 //update header value
1152 writeHeader();
1153 //update file length
1154 long here = _file.tellp();
1155 if (here%2880)
1156 {
1157 vector<char> nullVec(2880 - here%2880, 0);
1158 _file.write(nullVec.data(), 2880 - here%2880);
1159 }
1160 _file.close();
1161 return true;
1162}
1163
1164/****************************************************************
1165 * COPY TRANSPOSE TILE
1166 ****************************************************************/
1167void CompressedFitsWriter::copyTransposeTile(uint32_t index)
1168{
1169 uint32_t thisRoundNumRows = (_totalNumRows%_numRowsPerTile) ? _totalNumRows%_numRowsPerTile : _numRowsPerTile;
1170
1171 //copy the tile and transpose it
1172 uint32_t offset = 0;
1173 for (uint32_t i=0;i<_columns.size();i++)
1174 {
1175 switch (_columns[i].getColumnOrdering())//getCompression())
1176 {
1177 case FITS::kOrderByRow:
1178 for (uint32_t k=0;k<thisRoundNumRows;k++)
1179 {//regular, "semi-transposed" copy
1180 memcpy(&(_transposedBuffer[index][offset]), &_buffer[k*_rowWidth + _columns[i].offset()], _columns[i].sizeOfElems()*_columns[i].numElems());
1181 offset += _columns[i].sizeOfElems()*_columns[i].numElems();
1182 }
1183 break;
1184
1185 case FITS::kOrderByCol :
1186 for (int j=0;j<_columns[i].numElems();j++)
1187 for (uint32_t k=0;k<thisRoundNumRows;k++)
1188 {//transposed copy
1189 memcpy(&(_transposedBuffer[index][offset]), &_buffer[k*_rowWidth + _columns[i].offset() + _columns[i].sizeOfElems()*j], _columns[i].sizeOfElems());
1190 offset += _columns[i].sizeOfElems();
1191 }
1192 break;
1193 default:
1194 cout << "Error: unknown column ordering: " << _columns[i].getColumnOrdering() << endl;
1195
1196 };
1197 }
1198}
1199
1200/****************************************************************
1201 * WRITE BINARY ROW
1202 ****************************************************************/
1203bool CompressedFitsWriter::writeBinaryRow(const char* bufferToWrite)
1204{
1205 if (_totalNumRows == 0)
1206 writeHeader();
1207
1208 memcpy(&_buffer[_rowWidth*(_totalNumRows%_numRowsPerTile)], bufferToWrite, _rowWidth);
1209 _totalNumRows++;
1210 if (_totalNumRows%_numRowsPerTile == 0)
1211 {
1212 //which is the next thread that we should use ?
1213 while (_threadStatus[_threadLooper] == _THREAD_COMPRESS_)
1214 usleep(100000);
1215
1216 copyTransposeTile(_threadLooper);
1217
1218 while (_threadStatus[_threadLooper] != _THREAD_WAIT_)
1219 usleep(100000);
1220
1221 _threadNumRows[_threadLooper] = _totalNumRows;
1222 _threadStatus[_threadLooper] = _THREAD_COMPRESS_;
1223 _threadLooper = (_threadLooper+1)%_numThreads;
1224 }
1225 return _file.good();
1226}
1227
1228uint32_t CompressedFitsWriter::getRowWidth()
1229{
1230 return _rowWidth;
1231}
1232
1233/****************************************************************
1234 * COMPRESS BUFFER
1235 ****************************************************************/
1236uint32_t CompressedFitsWriter::compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1237{
1238 memcpy(dest, src, numRows*sizeOfElems*numRowElems);
1239 return numRows*sizeOfElems*numRowElems;
1240}
1241
1242/****************************************************************
1243 * COMPRESS BUFFER
1244 ****************************************************************/
1245uint32_t CompressedFitsWriter::compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1246{
1247 string huffmanOutput;
1248 uint32_t previousHuffmanSize = 0;
1249 if (numRows < 2)
1250 {//if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
1251 return numRows*sizeOfElems*numRowElems + 1000;
1252 }
1253 if (sizeOfElems < 2 )
1254 {
1255 cout << "Fatal ERROR: HUFMANN can only encode short or longer types" << endl;
1256 return 0;
1257 }
1258 uint32_t huffmanOffset = 0;
1259 for (uint32_t j=0;j<numRowElems;j++)
1260 {
1261 Huffman::Encode(huffmanOutput,
1262 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
1263 numRows*(sizeOfElems/2));
1264 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
1265 huffmanOffset += sizeof(uint32_t);
1266 previousHuffmanSize = huffmanOutput.size();
1267 }
1268 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
1269
1270 //only copy if not larger than not-compressed size
1271 if (totalSize < numRows*sizeOfElems*numRowElems)
1272 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
1273
1274 return totalSize;
1275}
1276
1277/****************************************************************
1278 * COMPRESS BUFFER
1279 ****************************************************************/
1280uint32_t CompressedFitsWriter::compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1281{
1282 uint32_t colWidth = numRowElems;
1283 for (int j=colWidth*numRows-1;j>1;j--)
1284 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
1285 //call the huffman transposed
1286 return compressHUFFMAN(dest, src, numRowElems, sizeOfElems, numRows);
1287}
1288
1289uint32_t CompressedFitsWriter::applySMOOTHING(char* , char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1290{
1291 uint32_t colWidth = numRowElems;
1292 for (int j=colWidth*numRows-1;j>1;j--)
1293 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
1294
1295 return numRows*sizeOfElems*numRowElems;
1296}
1297/****************************************************************
1298 * COMPRESS BUFFER
1299 ****************************************************************/
1300uint64_t CompressedFitsWriter::compressBuffer(uint32_t threadIndex)
1301{
1302 uint32_t thisRoundNumRows = (_threadNumRows[threadIndex]%_numRowsPerTile) ? _threadNumRows[threadIndex]%_numRowsPerTile : _numRowsPerTile;
1303 uint32_t offset=0;
1304 uint32_t currentCatalogRow = (_threadNumRows[threadIndex]-1)/_numRowsPerTile;
1305 uint64_t compressedOffset = sizeof(TileHeader); //skip the 'TILE' marker and tile size entry
1306
1307 //now compress each column one by one by calling compression on arrays
1308 for (uint32_t i=0;i<_columns.size();i++)
1309 {
1310 _catalog[currentCatalogRow][i].second = compressedOffset;
1311
1312 if (_columns[i].numElems() == 0) continue;
1313
1314 BlockHeader& head = _columns[i].getBlockHeader();
1315 const vector<uint16_t>& sequence = _columns[i].getCompressionSequence();
1316 //set the default byte telling if uncompressed the compressed Flag
1317 uint64_t previousOffset = compressedOffset;
1318 //skip header data
1319 compressedOffset += sizeof(BlockHeader) + sizeof(uint16_t)*sequence.size();
1320
1321 for (uint32_t j=0;j<sequence.size(); j++)
1322 {
1323 switch (sequence[j])
1324 {
1325 case FITS::kFactRaw:
1326// if (head.numProcs == 1)
1327 compressedOffset += compressUNCOMPRESSED(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1328 break;
1329 case FITS::kFactSmoothing:
1330 applySMOOTHING(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1331 break;
1332 case FITS::kFactHuffman16:
1333 if (head.ordering == FITS::kOrderByCol)
1334 compressedOffset += compressHUFFMAN(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1335 else
1336 compressedOffset += compressHUFFMAN(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), _columns[i].numElems(), _columns[i].sizeOfElems(), thisRoundNumRows);
1337 break;
1338 default:
1339 cout << "ERROR: Unkown compression sequence entry: " << sequence[i] << endl;
1340 break;
1341 }
1342 }
1343
1344 //check if compressed size is larger than uncompressed
1345 if (sequence[0] != FITS::kFactRaw &&
1346 compressedOffset - previousOffset > _columns[i].sizeOfElems()*_columns[i].numElems()*thisRoundNumRows+sizeof(BlockHeader)+sizeof(uint16_t)*sequence.size())
1347 {//if so set flag and redo it uncompressed
1348 cout << "REDOING UNCOMPRESSED" << endl;
1349 compressedOffset = previousOffset + sizeof(BlockHeader) + 1;
1350 compressedOffset += compressUNCOMPRESSED(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1351 BlockHeader he;
1352 he.size = compressedOffset - previousOffset;
1353 he.numProcs = 1;
1354 he.ordering = FITS::kOrderByRow;
1355 memcpy(&(_compressedBuffer[threadIndex][previousOffset]), (char*)(&he), sizeof(BlockHeader));
1356 _compressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)] = FITS::kFactRaw;
1357 offset += thisRoundNumRows*_columns[i].sizeOfElems()*_columns[i].numElems();
1358 _catalog[currentCatalogRow][i].first = compressedOffset - _catalog[currentCatalogRow][i].second;
1359 continue;
1360 }
1361 head.size = compressedOffset - previousOffset;
1362 memcpy(&(_compressedBuffer[threadIndex][previousOffset]), (char*)(&head), sizeof(BlockHeader));
1363 memcpy(&(_compressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)]), sequence.data(), sizeof(uint16_t)*sequence.size());
1364
1365 offset += thisRoundNumRows*_columns[i].sizeOfElems()*_columns[i].numElems();
1366 _catalog[currentCatalogRow][i].first = compressedOffset - _catalog[currentCatalogRow][i].second;
1367 }
1368
1369 TileHeader tHead(thisRoundNumRows, compressedOffset);
1370 memcpy(_compressedBuffer[threadIndex], &tHead, sizeof(TileHeader));
1371 return compressedOffset;
1372}
1373
1374/****************************************************************
1375 * WRITE COMPRESS DATA TO DISK
1376 ****************************************************************/
1377bool CompressedFitsWriter::writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite)
1378{
1379 char* checkSumPointer = _compressedBuffer[threadID];
1380 int32_t extraBytes = 0;
1381 uint32_t sizeToChecksum = sizeToWrite;
1382 if (_checkOffset != 0)
1383 {//should we extend the array to the left ?
1384 sizeToChecksum += _checkOffset;
1385 checkSumPointer -= _checkOffset;
1386 memset(checkSumPointer, 0, _checkOffset);
1387 }
1388 if (sizeToChecksum%4 != 0)
1389 {//should we extend the array to the right ?
1390 extraBytes = 4 - (sizeToChecksum%4);
1391 memset(checkSumPointer+sizeToChecksum, 0,extraBytes);
1392 sizeToChecksum += extraBytes;
1393 }
1394 //do the checksum
1395 _checksum.add(checkSumPointer, sizeToChecksum);
1396// cout << endl << "Checksum: " << _checksum.val() << endl;
1397 _checkOffset = (4 - extraBytes)%4;
1398 //write data to disk
1399 _file.write(_compressedBuffer[threadID], sizeToWrite);
1400 return _file.good();
1401}
1402
1403/****************************************************************
1404 * WRITER THREAD LOOP
1405 ****************************************************************/
1406void* CompressedFitsWriter::threadFunction(void* context)
1407{
1408 CompressedFitsWriter* myself =static_cast<CompressedFitsWriter*>(context);
1409
1410 uint32_t myID = 0;
1411 pthread_mutex_lock(&(myself->_mutex));
1412 myID = myself->_threadIndex++;
1413 pthread_mutex_unlock(&(myself->_mutex));
1414 uint32_t threadToWaitForBeforeWriting = (myID == 0) ? myself->_numThreads-1 : myID-1;
1415
1416 while (myself->_threadStatus[myID] != _THREAD_EXIT_)
1417 {
1418 while (myself->_threadStatus[myID] == _THREAD_WAIT_)
1419 usleep(100000);
1420 if (myself->_threadStatus[myID] != _THREAD_COMPRESS_)
1421 continue;
1422 uint32_t numBytes = myself->compressBuffer(myID);
1423 myself->_threadStatus[myID] = _THREAD_WRITE_;
1424
1425 //wait for the previous data to be written
1426 while (myself->_threadIndex != threadToWaitForBeforeWriting)
1427 usleep(1000);
1428 //do the actual writing to disk
1429 pthread_mutex_lock(&(myself->_mutex));
1430 myself->writeCompressedDataToDisk(myID, numBytes);
1431 myself->_threadIndex = myID;
1432 pthread_mutex_unlock(&(myself->_mutex));
1433 myself->_threadStatus[myID] = _THREAD_WAIT_;
1434 }
1435 return NULL;
1436}
1437
1438/****************************************************************
1439 * PRINT USAGE
1440 ****************************************************************/
1441void printUsage()
1442{
1443 cout << endl;
1444 cout << "The FACT-Fits compressor reads an input Fits file from FACT"
1445 " and compresses it.\n It can use a drs calibration in order to"
1446 " improve the compression ratio. If so, the input calibration"
1447 " is embedded into the compressed file.\n"
1448 " By default, the Data column will be compressed using SMOOTHMAN (Thomas' algorithm)"
1449 " while other columns will be compressed with the AMPLITUDE coding (Veritas)"
1450 "Usage: Compressed_Fits_Test <inputFile>";
1451 cout << endl;
1452}
1453
1454/****************************************************************
1455 * PRINT HELP
1456 ****************************************************************/
1457void printHelp()
1458{
1459 cout << endl;
1460 cout << "The inputFile is required. It must have fits in its filename and the compressed file will be written in the same folder. "
1461 "The fz extension will be added, replacing the .gz one if required \n"
1462 "If output is specified, then it will replace the automatically generated output filename\n"
1463 "If --drs, followed by a drs calib then it will be applied to the data before compressing\n"
1464 "rowPerTile can be used to tune how many rows are in each tile. Default is 100\n"
1465 "threads gives the number of threads to use. Cannot be less than the default (1)\n"
1466 "compression explicitely gives the compression scheme to use for a given column. The syntax is:\n"
1467 "<ColumnName>=<CompressionScheme> with <CompressionScheme> one of the following:\n"
1468 "UNCOMPRESSED\n"
1469 "AMPLITUDE\n"
1470 "HUFFMAN\n"
1471 "SMOOTHMAN\n"
1472 "INT_WAVELET\n"
1473 "\n"
1474 "--quiet removes any textual output, except error messages\n"
1475 "--verify makes the compressor check the compressed data. It will read it back, and compare the reconstructed CHECKSUM and DATASUM with the original file values."
1476 ;
1477 cout << endl << endl;
1478}
1479
1480/****************************************************************
1481 * SETUP CONFIGURATION
1482 ****************************************************************/
1483void setupConfiguration(Configuration& conf)
1484{
1485 po::options_description configs("FitsCompressor options");
1486 configs.add_options()
1487 ("inputFile,i", vars<string>(), "Input file")
1488 ("drs,d", var<string>(), "Input drs calibration file")
1489 ("rowPerTile,r", var<unsigned int>(), "Number of rows per tile. Default is 100")
1490 ("output,o", var<string>(), "Output file. If empty, .fz is appened to the original name")
1491 ("threads,t", var<unsigned int>(), "Number of threads to use for compression")
1492 ("compression,c", vars<string>(), "which compression to use for which column. Syntax <colName>=<compressionScheme>")
1493 ("quiet,q", po_switch(), "Should the program display any text at all ?")
1494 ("verify,v", po_switch(), "Should we verify the data that has been compressed ?")
1495 ;
1496 po::positional_options_description positional;
1497 positional.add("inputFile", -1);
1498 conf.AddOptions(configs);
1499 conf.SetArgumentPositions(positional);
1500}
1501
1502/****************************************************************
1503 * MAIN
1504 ****************************************************************/
1505int main(int argc, const char** argv)
1506{
1507 Configuration conf(argv[0]);
1508 conf.SetPrintUsage(printUsage);
1509 setupConfiguration(conf);
1510
1511 if (!conf.DoParse(argc, argv, printHelp))
1512 return -1;
1513
1514 //initialize the file names to nothing.
1515 string fileNameIn = "";
1516 string fileNameOut = "";
1517 string drsFileName = "";
1518 uint32_t numRowsPerTile = 100;
1519 bool displayText=true;
1520
1521 //parse configuration
1522 if (conf.Get<bool>("quiet")) displayText = false;
1523 const vector<string> inputFileNameVec = conf.Vec<string>("inputFile");
1524 if (inputFileNameVec.size() != 1)
1525 {
1526 cout << "Error: ";
1527 if (inputFileNameVec.size() == 0) cout << "no";
1528 else cout << inputFileNameVec.size();
1529 cout << " input file(s) given. Expected one. Aborting. Input:" << endl;;
1530 for (unsigned int i=0;i<inputFileNameVec.size(); i++)
1531 cout << inputFileNameVec[i] << endl;
1532 return -1;
1533 }
1534
1535 //Assign the input filename
1536 fileNameIn = inputFileNameVec[0];
1537
1538 //Check if we have a drs calib too
1539 if (conf.Has("drs")) drsFileName = conf.Get<string>("drs");
1540
1541 //Should we verify the data ?
1542 bool verifyDataPlease = false;
1543 if (conf.Has("verify")) verifyDataPlease = conf.Get<bool>("verify");
1544
1545
1546 //should we use a specific output filename ?
1547 if (conf.Has("output"))
1548 fileNameOut = conf.Get<string>("output");
1549 else
1550 {
1551 size_t pos = fileNameIn.find(".fits.fz");
1552 if (pos == string::npos)
1553 {
1554 cout << "ERROR: input file does not seems ot be fits. Aborting." << endl;
1555 return -1;
1556 }
1557 fileNameOut = fileNameIn.substr(0, pos) + ".fits";
1558 }
1559
1560
1561 //should we use specific compression on some columns ?
1562 const vector<string> columnsCompression = conf.Vec<string>("compression");
1563
1564 //split up values between column names and compression scheme
1565 vector<std::pair<string, string>> compressions;
1566 for (unsigned int i=0;i<columnsCompression.size();i++)
1567 {
1568 size_t pos = columnsCompression[i].find_first_of("=");
1569 if (pos == string::npos)
1570 {
1571 cout << "ERROR: Something wrong occured while parsing " << columnsCompression[i] << ". Aborting." << endl;
1572 return -1;
1573 }
1574 string comp = columnsCompression[i].substr(pos+1);
1575 if (comp != "UNCOMPRESSED" && comp != "AMPLITUDE" && comp != "HUFFMAN" &&
1576 comp != "SMOOTHMAN" && comp != "INT_WAVELET")
1577 {
1578 cout << "Unkown compression scheme requested (" << comp << "). Aborting." << endl;
1579 return -1;
1580 }
1581 compressions.push_back(make_pair(columnsCompression[i].substr(0, pos), comp));
1582 }
1583
1584 //How many rows per tile should we use ?
1585 if (conf.Has("rowPerTile")) numRowsPerTile = conf.Get<unsigned int>("rowPerTile");
1586
1587 //////////////////////////////////////////////////////////////////////////////////////
1588 // Done reading configuration. Open relevant files
1589 //////////////////////////////////////////////////////////////////////////////////////
1590
1591
1592 //Open input's fits file
1593 factfits inFile(fileNameIn, "", "Events", false);
1594
1595 if (!inFile.IsCompressedFITS())
1596 {
1597 cout << "ERROR: input file is NOT a compressed fits. Cannot be decompressed: Aborting." << endl;
1598 return -1;
1599 }
1600
1601 //decide how many tiles should be put in the compressed file
1602 uint32_t originalNumRows = inFile.GetNumRows();
1603 uint32_t numTiles = (originalNumRows%numRowsPerTile) ? (originalNumRows/numRowsPerTile)+1 : originalNumRows/numRowsPerTile;
1604// CompressedFitsWriter outFile(numTiles, numRowsPerTile);
1605
1606 //should we use a specific number of threads for compressing ?
1607 unsigned int numThreads = 1;
1608 if (conf.Has("threads"))
1609 {
1610 numThreads = conf.Get<unsigned int>("threads");
1611// outFile.setNumWorkingThreads(numThreads);
1612 }
1613
1614
1615
1616 //Because the file to open MUST be given by the constructor, I must use a pointer instead
1617 factfits* drsFile = NULL;
1618 //try to open the Drs file. If any.
1619 if (drsFileName != "")
1620 {
1621 try
1622 {
1623 drsFile = new factfits(drsFileName);
1624 }
1625 catch (...)
1626 {
1627 cout << "Error: could not open " << drsFileName << " for calibration" << endl;
1628 return -1;
1629 }
1630 }
1631
1632 if (displayText)
1633 {
1634 cout << endl;
1635 cout << "**********************" << endl;
1636 cout << "Will decompress from : " << fileNameIn << endl;
1637 cout << "to : " << fileNameOut << endl;
1638 cout << "**********************" << endl;
1639 cout << endl;
1640 }
1641
1642 //////////////////////////////////////////////////////////////////////////////////////
1643 // Done opening input files. Allocate memory and configure output file
1644 //////////////////////////////////////////////////////////////////////////////////////
1645
1646 //allocate the buffer for temporary storage of each read/written row
1647 uint32_t rowWidth = inFile.GetUInt("ZNAXIS1");
1648 char* buffer = new char[rowWidth + 12];
1649 memset(buffer, 0, 4);
1650 buffer = buffer+4;
1651
1652 //get the source columns
1653 const fits::Table::Columns& columns = inFile.GetColumns();
1654 const fits::Table::SortedColumns& sortedColumns = inFile.GetSortedColumns();
1655 if (displayText)
1656 cout << "Input file has " << columns.size() << " columns and " << inFile.GetNumRows() << " rows" << endl;
1657
1658
1659 //////////////////////////////////////////////////////////////////////////////////////
1660 // Done configuring compression. Do the real job now !
1661 //////////////////////////////////////////////////////////////////////////////////////
1662 vector<void*> readPointers;
1663 vector<int32_t> readOffsets;
1664 vector<int32_t> readElemSize;
1665 vector<int32_t> readNumElems;
1666 //Get table name for later use in case the compressed file is to be verified
1667 string tableName = inFile.GetStr("EXTNAME");
1668
1669
1670 //and the header of the compressed file
1671 const fits::Table::Keys& header2 = inFile.GetKeys();
1672
1673 //get a non-compressed writer
1674 ofits reconstructedFile;
1675
1676 //figure out its name: /dev/null unless otherwise specified
1677 string reconstructedName = fileNameOut;
1678 reconstructedFile.open(reconstructedName.c_str(), false);
1679
1680 //reconstruct the original columns from the compressed file.
1681 string origChecksumStr;
1682 string origDatasum;
1683
1684 //reset tablename value so that it is re-read from compressed table's header
1685 tableName = "";
1686
1687 /************************************************************************************
1688 * Reconstruction setup done. Rebuild original header
1689 ************************************************************************************/
1690
1691 //re-tranlate the keys
1692 for (fits::Table::Keys::const_iterator it=header2.begin(); it!= header2.end(); it++)
1693 {
1694 string k = it->first;
1695 if (k == "XTENSION" || k == "BITPIX" || k == "PCOUNT" || k == "GCOUNT" ||
1696 k == "TFIELDS" || k == "ZTABLE" || k == "ZNAXIS1" || k == "ZNAXIS2" ||
1697 k == "ZHEAPPTR" || k == "ZPCOUNT" || k == "ZTILELEN" || k == "THEAP" ||
1698 k == "CHECKSUM" || k == "DATASUM" || k == "FCTCPVER" || k == "ZHEAP")
1699 {
1700 continue;
1701 }
1702
1703 if (k == "ZCHKSUM")
1704 {
1705 reconstructedFile.SetKeyComment("CHECKSUM", it->second.comment);
1706 origChecksumStr = it->second.value;
1707 continue;
1708 }
1709 if (k == "RAWSUM")
1710 {
1711 continue;
1712 }
1713
1714 if (k == "ZDTASUM")
1715 {
1716 reconstructedFile.SetKeyComment("DATASUM", it->second.comment);
1717 origDatasum = it->second.value;
1718 continue;
1719 }
1720
1721 if (k == "EXTNAME")
1722 {
1723 tableName = it->second.value;
1724 }
1725
1726 k = k.substr(0,5);
1727
1728 if (k == "TTYPE")
1729 {//we have an original column name here.
1730 //manually deal with these in order to preserve the ordering (easier than re-constructing yet another list on the fly)
1731 continue;
1732 }
1733
1734 if (k == "TFORM" || k == "NAXIS" || k == "ZCTYP" )
1735 {
1736 continue;
1737 }
1738
1739 if (k == "ZFORM" || k == "ZTYPE")
1740 {
1741 string tmpKey = it->second.fitsString;
1742 tmpKey[0] = 'T';
1743 reconstructedFile.SetKeyFromFitsString(tmpKey);
1744 continue;
1745 }
1746
1747 reconstructedFile.SetKeyFromFitsString(it->second.fitsString);
1748 }
1749
1750 if (tableName == "")
1751 {
1752 cout << "Error: table name from file " << fileNameOut << " could not be found. Aborting" << endl;
1753 return -1;
1754 }
1755
1756 //Restore the original columns
1757 for (uint32_t numCol=1; numCol<10000; numCol++)
1758 {
1759 ostringstream str;
1760 str << numCol;
1761 if (!inFile.HasKey("TTYPE"+str.str())) break;
1762
1763 string ttype = inFile.GetStr("TTYPE"+str.str());
1764 string tform = inFile.GetStr("ZFORM"+str.str());
1765 char type = tform[tform.size()-1];
1766 string number = tform.substr(0, tform.size()-1);
1767 int numElems = atoi(number.c_str());
1768
1769 if (number == "") numElems=1;
1770
1771 reconstructedFile.AddColumn(numElems, type, ttype, "", "", false);
1772 }
1773
1774 reconstructedFile.WriteTableHeader(tableName.c_str());
1775
1776 /************************************************************************************
1777 * Original header restored. Do the data
1778 ************************************************************************************/
1779
1780 //set pointers to the readout data to later be able to gather it to "buffer".
1781 readPointers.clear();
1782 readOffsets.clear();
1783 readElemSize.clear();
1784 readNumElems.clear();
1785 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1786 {
1787 readPointers.push_back(inFile.SetPtrAddress(it->first));
1788 readOffsets.push_back(it->second.offset);
1789 readElemSize.push_back(it->second.size);
1790 readNumElems.push_back(it->second.num);
1791 }
1792
1793 //do the actual reconstruction work
1794 uint32_t i=1;
1795 while (i<=inFile.GetNumRows() && inFile.GetNextRow())
1796 {
1797 int count=0;
1798 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end();it++)
1799 {
1800 memcpy(&buffer[readOffsets[count]], readPointers[count], readElemSize[count]*readNumElems[count]);
1801 count++;
1802 }
1803 if (displayText) cout << "\r Row " << i << flush;
1804 reconstructedFile.WriteRow(buffer, rowWidth);
1805 if (!reconstructedFile.good())
1806 {
1807 cout << "ERROR: no space left on device (probably)" << endl;
1808 return -1;
1809 }
1810 i++;
1811 }
1812
1813 if (displayText) cout << endl;
1814
1815 //close reconstruction input and output
1816// Do NOT close the verify file, otherwise data cannot be flushed to copy file
1817// verifyFile.close();
1818 if (!inFile.IsFileOk())
1819 cout << "ERROR: file checksums seems wrong" << endl;
1820
1821 if (!reconstructedFile.close())
1822 {
1823 cout << "ERROR: disk probably full..." <<endl;
1824 return -1;
1825 }
1826
1827 //get original and reconstructed checksum and datasum
1828 std::pair<string, int> origChecksum = make_pair(origChecksumStr, atoi(origDatasum.c_str()));
1829 std::pair<string, int> newChecksum = reconstructedFile.GetChecksumData();
1830
1831 //verify that no mistake was made
1832 if (origChecksum.second != newChecksum.second)
1833 {
1834 cout << "ERROR: datasums are NOT identical: " << (uint32_t)(origChecksum.second) << " vs " << (uint32_t)(newChecksum.second) << endl;
1835 return -1;
1836 }
1837 if (origChecksum.first != newChecksum.first)
1838 {
1839 cout << "WARNING: checksums are NOT Identical: " << origChecksum.first << " vs " << newChecksum.first << endl;
1840 }
1841 else
1842 {
1843 if (true) cout << "Ok" << endl;
1844 }
1845
1846 buffer = buffer-4;
1847 delete[] buffer;
1848 return 0;
1849}
1850
Note: See TracBrowser for help on using the repository browser.