source: trunk/FACT++/src/fitsCompressor.cc@ 16814

Last change on this file since 16814 was 16814, checked in by lyard, 11 years ago
added block headers and changed compression mechanism
File size: 84.5 KB
Line 
1/*
2 * fitsCompressor.cc
3 *
4 * Created on: May 7, 2013
5 * Author: lyard
6 */
7
8
9#include "Configuration.h"
10#include "../externals/factfits.h"
11#include "../externals/ofits.h"
12#include "../externals/checksum.h"
13
14#include <map>
15#include <fstream>
16#include <sstream>
17#include <iostream>
18
19
20using namespace std;
21
22typedef struct TileHeader
23{
24 char id[4];
25 uint32_t numRows;
26 uint64_t size;
27 TileHeader(uint32_t nRows=0,
28 uint64_t s=0) : id({'T', 'I', 'L', 'E'}),
29 numRows(nRows),
30 size(s)
31 { };
32} __attribute__((__packed__)) TileHeader;
33
34typedef struct BlockHeader
35{
36 uint64_t size;
37 char ordering;
38 unsigned char numProcs;
39 BlockHeader(uint64_t s=0,
40 char o=FACT_ROW_MAJOR,
41 unsigned char n=1) : size(s),
42 ordering(o),
43 numProcs(n)
44 {}
45} __attribute__((__packed__)) BlockHeader;
46
47class CompressedFitsFile
48{
49public:
50 class HeaderEntry
51 {
52 public:
53 /**
54 * Default constructor
55 */
56 HeaderEntry(): _key(""),
57 _value(""),
58 _comment(""),
59 _fitsString("")
60 {
61 }
62
63 /**
64 * Regular constructor.
65 * @param key the name of the keyword entry
66 * @param value its value
67 * @param comment an optionnal comment to be placed after the value
68 */
69 template<typename T>
70 HeaderEntry(const string& k,
71 const T& val,
72 const string& comm) : _key(k),
73 _value(""),
74 _comment(comm),
75 _fitsString("")
76 {
77 setValue(val);
78 }
79
80 /**
81 * From fits.h
82 */
83 string Trim(const string &str, char c=' ')
84 {
85 // Trim Both leading and trailing spaces
86 const size_t pstart = str.find_first_not_of(c); // Find the first character position after excluding leading blank spaces
87 const size_t pend = str.find_last_not_of(c); // Find the first character position from reverse af
88
89 // if all spaces or empty return an empty string
90 if (string::npos==pstart || string::npos==pend)
91 return string();
92
93 return str.substr(pstart, pend-pstart+1);
94 }
95 /**
96 * Constructor from the original fits entry
97 */
98 HeaderEntry(const string& line)
99 {
100 _fitsString = line.data();
101
102 //parse the line
103 _key = Trim(line.substr(0,8));
104 //COMMENT and/or HISTORY values
105 if (line.substr(8,2)!= "= ")
106 {
107 _value = "";
108 _comment = Trim(line.substr(10));
109 return;
110 }
111 string next=line.substr(10);
112 const size_t slash = next.find_first_of('/');
113 _value = Trim(Trim(Trim(next.substr(0, slash)), '\''));
114 _comment = Trim(next.substr(slash+1));
115 }
116 /**
117 * Alternative constroctor from the fits entry
118 */
119 HeaderEntry(const vector<char>& lineVec)
120 {
121 HeaderEntry(string(lineVec.data()));
122 }
123 /**
124 * Destructor
125 */
126 virtual ~HeaderEntry(){}
127
128 const string& value() const {return _value;}
129 const string& key() const {return _key;}
130 const string& comment() const {return _comment;}
131 const string& fitsString() const {return _fitsString;}
132
133 /**
134 * Set a keyword value.
135 * @param value The value to be set
136 * @param update whether the value already exist or not. To be modified soon.
137 */
138 template<typename T>
139 void setValue(const T& val)
140 {
141 ostringstream str;
142 str << val;
143 _value = str.str();
144 buildFitsString();
145 };
146 /**
147 * Set the comment for a given entry
148 */
149 void setComment(const string& comm)
150 {
151 _comment = comm;
152 buildFitsString();
153 }
154
155 private:
156 /**
157 * Construct the FITS header string from the key, value and comment
158 */
159 void buildFitsString()
160 {
161 ostringstream str;
162 unsigned int totSize = 0;
163
164 // Tuncate the key if required
165 if (_key.length() > 8)
166 {
167 str << _key.substr(0, 8);
168 totSize += 8;
169 }
170 else
171 {
172 str << _key;
173 totSize += _key.length();
174 }
175
176 // Append space if key is less than 8 chars long
177 for (int i=totSize; i<8;i++)
178 {
179 str << " ";
180 totSize++;
181 }
182
183 // Add separator
184 str << "= ";
185 totSize += 2;
186
187 // Format value
188 if (_value.length() < 20)
189 for (;totSize<30-_value.length();totSize++)
190 str << " ";
191
192 if (_value.length() > 70)
193 {
194 str << _value.substr(0,70);
195 totSize += 70;
196 }
197 else
198 {
199 str << _value;
200 totSize += _value.size();
201 }
202
203 // If there is space remaining, add comment area
204 if (totSize < 77)
205 {
206 str << " / ";
207 totSize += 3;
208 if (totSize < 80)
209 {
210 unsigned int commentSize = 80 - totSize;
211 if (_comment.length() > commentSize)
212 {
213 str << _comment.substr(0,commentSize);
214 totSize += commentSize;
215 }
216 else
217 {
218 str << _comment;
219 totSize += _comment.length();
220 }
221 }
222 }
223
224 // If there is yet some free space, fill up the entry with spaces
225 for (int i=totSize; i<80;i++)
226 str << " ";
227
228 _fitsString = str.str();
229
230 // Check for correct completion
231 if (_fitsString.length() != 80)
232 cout << "Error |" << _fitsString << "| is not of length 80" << endl;
233 }
234
235 string _key; ///< the key (name) of the header entry
236 string _value; ///< the value of the header entry
237 string _comment; ///< the comment associated to the header entry
238 string _fitsString; ///< the string that will be written to the fits file
239 };
240
241 /**
242 * Supported compressions
243 */
244 typedef enum
245 {
246 UNCOMPRESSED,
247 SMOOTHMAN
248 } FitsCompression;
249
250 /**
251 * Columns class
252 */
253 class ColumnEntry
254 {
255 public:
256 /**
257 * Default constructor
258 */
259 ColumnEntry();
260
261 /**
262 * Default destructor
263 */
264 virtual ~ColumnEntry(){}
265
266 /**
267 * Constructor from values
268 * @param n the column name
269 * @param t the column type
270 * @param numof the number of entries in the column
271 * @param comp the compression type for this column
272 */
273 ColumnEntry(const string& n,
274 char t,
275 int numOf,
276 BlockHeader& head,
277 vector<uint16_t>& seq) : _name(n),
278 _num(numOf),
279 _typeSize(0),
280 _offset(0),
281 _type(t),
282 _description(""),
283 _header(head),
284 _compSequence(seq)
285 {
286 switch (t)
287 {
288 case 'L':
289 case 'A':
290 case 'B': _typeSize = 1; break;
291 case 'I': _typeSize = 2; break;
292 case 'J':
293 case 'E': _typeSize = 4; break;
294 case 'K':
295 case 'D': _typeSize = 8; break;
296 default:
297 cout << "Error: typename " << t << " missing in the current implementation" << endl;
298 };
299
300 ostringstream str;
301 str << "data format of field: ";
302
303 switch (t)
304 {
305 case 'L': str << "1-byte BOOL"; break;
306 case 'A': str << "1-byte CHAR"; break;
307 case 'B': str << "BYTE"; break;
308 case 'I': str << "2-byte INTEGER"; break;
309 case 'J': str << "4-byte INTEGER"; break;
310 case 'K': str << "8-byte INTEGER"; break;
311 case 'E': str << "4-byte FLOAT"; break;
312 case 'D': str << "8-byte FLOAT"; break;
313 }
314
315 _description = str.str();
316 }
317
318 const string& name() const { return _name;};
319 int width() const { return _num*_typeSize;};
320 int offset() const { return _offset; };
321 int numElems() const { return _num; };
322 int sizeOfElems() const { return _typeSize;};
323 void setOffset(int off) { _offset = off;};
324 char type() const { return _type;};
325 string getDescription() const { return _description;}
326 BlockHeader& getBlockHeader() { return _header;}
327 const vector<uint16_t>& getCompressionSequence() const { return _compSequence;}
328 const char& getColumnOrdering() const { return _header.ordering;}
329
330
331 string getCompressionString() const
332 {
333 return "FACT";
334 /*
335 ostringstream str;
336 for (uint32_t i=0;i<_compSequence.size();i++)
337 switch (_compSequence[i])
338 {
339 case FACT_RAW: if (str.str().size() == 0) str << "RAW"; break;
340 case FACT_SMOOTHING: str << "SMOOTHING "; break;
341 case FACT_HUFFMAN16: str << "HUFFMAN16 "; break;
342 };
343 return str.str();*/
344 }
345
346 private:
347
348 string _name; ///< name of the column
349 int _num; ///< number of elements contained in one row of this column
350 int _typeSize; ///< the number of bytes taken by one element
351 int _offset; ///< the offset of the column, in bytes, from the beginning of one row
352 char _type; ///< the type of the column, as specified by the fits documentation
353 string _description; ///< a description for the column. It will be placed in the header
354 BlockHeader _header;
355 vector<uint16_t> _compSequence;
356 };
357
358 public:
359 ///@brief default constructor. Assigns a default number of rows and tiles
360 CompressedFitsFile(uint32_t numTiles=100, uint32_t numRowsPerTile=100);
361
362 ///@brief default destructor
363 virtual ~CompressedFitsFile();
364
365 ///@brief get the header of the file
366 vector<HeaderEntry>& getHeaderEntries() { return _header;}
367
368 protected:
369 ///@brief protected function to allocate the intermediate buffers
370 bool reallocateBuffers();
371
372 //FITS related stuff
373 vector<HeaderEntry> _header; ///< Header keys
374 vector<ColumnEntry> _columns; ///< Columns in the file
375 uint32_t _numTiles; ///< Number of tiles (i.e. groups of rows)
376 uint32_t _numRowsPerTile; ///< Number of rows per tile
377 uint32_t _totalNumRows; ///< Total number of raws
378 uint32_t _rowWidth; ///< Total number of bytes in one row
379 bool _headerFlushed; ///< Flag telling whether the header record is synchronized with the data on disk
380 char* _buffer; ///< Memory buffer to store rows while they are not compressed
381 Checksum _checksum; ///< Checksum for asserting the consistency of the data
382 fstream _file; ///< The actual file streamer for accessing disk data
383
384 //compression related stuff
385 typedef pair<int64_t, int64_t> CatalogEntry;
386 typedef vector<CatalogEntry> CatalogRow;
387 typedef vector<CatalogRow> CatalogType;
388 CatalogType _catalog; ///< Catalog, i.e. the main table that points to the compressed data.
389 uint64_t _heapPtr; ///< the address in the file of the heap area
390 vector<char*> _transposedBuffer; ///< Memory buffer to store rows while they are transposed
391 vector<char*> _compressedBuffer; ///< Memory buffer to store rows while they are compressed
392
393 //thread related stuff
394 uint32_t _numThreads; ///< The number of threads that will be used to compress
395 uint32_t _threadIndex; ///< A variable to assign threads indices
396 vector<pthread_t> _thread; ///< The thread handler of the compressor
397 vector<uint32_t> _threadNumRows; ///< Total number of rows for thread to compress
398 vector<uint32_t> _threadStatus; ///< Flag telling whether the buffer to be transposed (and compressed) is full or empty
399
400 //thread states. Not all used, but they do not hurt
401 static const uint32_t _THREAD_WAIT_; ///< Thread doing nothing
402 static const uint32_t _THREAD_COMPRESS_; ///< Thread working, compressing
403 static const uint32_t _THREAD_DECOMPRESS_; ///< Thread working, decompressing
404 static const uint32_t _THREAD_WRITE_; ///< Thread writing data to disk
405 static const uint32_t _THREAD_READ_; ///< Thread reading data from disk
406 static const uint32_t _THREAD_EXIT_; ///< Thread exiting
407
408 static HeaderEntry _dummyHeaderEntry; ///< Dummy entry for returning if requested on is not found
409};
410
411class CompressedFitsWriter : public CompressedFitsFile
412{
413 public:
414 ///@brief Default constructor. 100 tiles of 100 rows each are assigned by default
415 CompressedFitsWriter(uint32_t numTiles=100, uint32_t numRowsPerTile=100);
416
417 ///@brief default destructor
418 virtual ~CompressedFitsWriter();
419
420 ///@brief add one column to the file
421 bool addColumn(const ColumnEntry& column);
422
423 ///@brief sets a given header key
424 bool setHeaderKey(const HeaderEntry&);
425
426 ///@brief open a new fits file
427 bool open(const string& fileName, const string& tableName="Data");
428
429 ///@brief close the opened file
430 bool close();
431
432 ///@brief write one row of data, already placed in bufferToWrite. Does the byte-swapping
433 bool writeBinaryRow(const char* bufferToWrite);
434
435 ///@brief assign a given (already loaded) drs calibration
436 void setDrsCalib(int16_t* data);
437
438 ///@brief set the number of worker threads compressing the data.
439 bool setNumWorkingThreads(uint32_t num);
440
441 private:
442 ///@brief compresses one buffer of data, the one given by threadIndex
443 uint64_t compressBuffer(uint32_t threadIndex);
444
445 ///@brief writes an already compressed buffer to disk
446 bool writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite);
447
448 ///@brief add the header checksum to the datasum
449 void addHeaderChecksum(Checksum& checksum);
450
451 ///@brief write the header. If closingFile is set to true, checksum is calculated
452 void writeHeader(bool closingFile = false);
453
454 ///@brief write the compressed data catalog. If closingFile is set to true, checksum is calculated
455 void writeCatalog(bool closingFile=false);
456
457 /// FIXME this was a bad idea. Move everything to the regular header
458 vector<HeaderEntry> _defaultHeader;
459
460 /// the main function compressing the data
461 static void* threadFunction(void* context);
462
463 /// Write the drs calibration to disk, if any
464 void writeDrsCalib();
465
466 /// Copy and transpose (or semi-transpose) one tile of data
467 void copyTransposeTile(uint32_t index);
468
469 /// Specific compression functions
470 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
471 uint32_t compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
472 uint32_t compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
473 uint32_t applySMOOTHING(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
474
475 int32_t _checkOffset; ///< offset to the data pointer to calculate the checksum
476 int16_t* _drsCalibData; ///< array of the Drs baseline mean
477 int32_t _threadLooper; ///< Which thread will deal with the upcoming bunch of data ?
478 pthread_mutex_t _mutex; ///< mutex for compressing threads
479
480
481 static string _emptyBlock; ///< an empty block to be apened at the end of a file so that its length is a multiple of 2880
482 static string _fitsHeader; ///< the default header to be written in every fits file
483};
484
485const uint32_t CompressedFitsFile::_THREAD_WAIT_ = 0;
486const uint32_t CompressedFitsFile::_THREAD_COMPRESS_ = 1;
487const uint32_t CompressedFitsFile::_THREAD_DECOMPRESS_= 2;
488const uint32_t CompressedFitsFile::_THREAD_WRITE_ = 3;
489const uint32_t CompressedFitsFile::_THREAD_READ_ = 4;
490const uint32_t CompressedFitsFile::_THREAD_EXIT_ = 5;
491
492template<>
493void CompressedFitsFile::HeaderEntry::setValue(const string& v)
494{
495 string val = v;
496 if (val.size() > 2 && val[0] == '\'')
497 {
498 size_t pos = val.find_last_of("'");
499 if (pos != string::npos && pos != 0)
500 val = val.substr(1, pos-1);
501 }
502 ostringstream str;
503
504 str << "'" << val << "'";
505 for (int i=str.str().length(); i<20;i++)
506 str << " ";
507 _value = str.str();
508 buildFitsString();
509}
510
511/**
512 * Default header to be written in all fits files
513 */
514string CompressedFitsWriter::_fitsHeader = "SIMPLE = T / file does conform to FITS standard "
515 "BITPIX = 8 / number of bits per data pixel "
516 "NAXIS = 0 / number of data axes "
517 "EXTEND = T / FITS dataset may contain extensions "
518 "CHECKSUM= '4AcB48bA4AbA45bA' / Checksum for the whole HDU "
519 "DATASUM = ' 0' / Checksum for the data block "
520 "COMMENT FITS (Flexible Image Transport System) format is defined in 'Astronomy"
521 "COMMENT and Astrophysics', volume 376, page 359; bibcode: 2001A&A...376..359H "
522 "END "
523 " "
524 " "
525 " "
526 " "
527 " "
528 " "
529 " "
530 " "
531 " "
532 " "
533 " "
534 " "
535 " "
536 " "
537 " "
538 " "
539 " "
540 " "
541 " "
542 " "
543 " "
544 " "
545 " "
546 " "
547 " "
548 " "
549 " ";
550
551/**
552 * Empty block to be appenned at the end of files, so that the length matches multiple of 2880 bytes
553 *
554 */
555string CompressedFitsWriter::_emptyBlock = " "
556 " "
557 " "
558 " "
559 " "
560 " "
561 " "
562 " "
563 " "
564 " "
565 " "
566 " "
567 " "
568 " "
569 " "
570 " "
571 " "
572 " "
573 " "
574 " "
575 " "
576 " "
577 " "
578 " "
579 " "
580 " "
581 " "
582 " "
583 " "
584 " "
585 " "
586 " "
587 " "
588 " "
589 " "
590 " ";
591
592CompressedFitsFile::HeaderEntry CompressedFitsFile::_dummyHeaderEntry;
593/****************************************************************
594 * SUPER CLASS DEFAULT CONSTRUCTOR
595 ****************************************************************/
596CompressedFitsFile::CompressedFitsFile(uint32_t numTiles,
597 uint32_t numRowsPerTile) : _header(),
598 _columns(),
599 _numTiles(numTiles),
600 _numRowsPerTile(numRowsPerTile),
601 _totalNumRows(0),
602 _rowWidth(0),
603 _headerFlushed(false),
604 _buffer(NULL),
605 _checksum(0),
606 _heapPtr(0),
607 _transposedBuffer(1),
608 _compressedBuffer(1),
609 _numThreads(1),
610 _threadIndex(0),
611 _thread(1),
612 _threadNumRows(1),
613 _threadStatus(1)
614{
615 _catalog.resize(_numTiles);
616 _transposedBuffer[0] = NULL;
617 _compressedBuffer[0] = NULL;
618 _threadStatus[0] = _THREAD_WAIT_;
619 _threadNumRows[0] = 0;
620}
621
622/****************************************************************
623 * SUPER CLASS DEFAULT DESTRUCTOR
624 ****************************************************************/
625CompressedFitsFile::~CompressedFitsFile()
626{
627 if (_buffer != NULL)
628 {
629 delete[] _buffer;
630 _buffer = NULL;
631 for (uint32_t i=0;i<_numThreads;i++)
632 {
633 _compressedBuffer[i] = _compressedBuffer[i]-4;
634 delete[] _transposedBuffer[i];
635 delete[] _compressedBuffer[i];
636 _transposedBuffer[i] = NULL;
637 _compressedBuffer[i] = NULL;
638 }
639 }
640 if (_file.is_open())
641 _file.close();
642}
643
644/****************************************************************
645 * REALLOCATE BUFFER
646 ****************************************************************/
647bool CompressedFitsFile::reallocateBuffers()
648{
649 if (_buffer)
650 {
651 delete[] _buffer;
652 for (uint32_t i=0;i<_compressedBuffer.size();i++)
653 {
654 _compressedBuffer[i] = _compressedBuffer[i]-4;
655 delete[] _transposedBuffer[i];
656 delete[] _compressedBuffer[i];
657 }
658 }
659 _buffer = new char[_rowWidth*_numRowsPerTile];
660 if (_buffer == NULL) return false;
661 if (_compressedBuffer.size() != _numThreads)
662 {
663 _transposedBuffer.resize(_numThreads);
664 _compressedBuffer.resize(_numThreads);
665 }
666 for (uint32_t i=0;i<_numThreads;i++)
667 {
668 _transposedBuffer[i] = new char[_rowWidth*_numRowsPerTile];
669 _compressedBuffer[i] = new char[_rowWidth*_numRowsPerTile + _columns.size() + sizeof(TileHeader)]; //use a bit more memory for compression flags
670 if (_transposedBuffer[i] == NULL || _compressedBuffer[i] == NULL)
671 return false;
672 //shift the compressed buffer by 4 bytes, for checksum calculation
673 memset(_compressedBuffer[i], 0, 4);
674 _compressedBuffer[i] = _compressedBuffer[i]+4;
675 //initialize the tile header
676 TileHeader tileHeader;
677 memcpy(_compressedBuffer[i], &tileHeader, sizeof(TileHeader));
678 }
679 return true;
680}
681
682/****************************************************************
683 * DEFAULT WRITER CONSTRUCTOR
684 ****************************************************************/
685CompressedFitsWriter::CompressedFitsWriter(uint32_t numTiles,
686 uint32_t numRowsPerTile) : CompressedFitsFile(numTiles, numRowsPerTile),
687 _checkOffset(0),
688 _drsCalibData(NULL),
689 _threadLooper(0)
690{
691 _defaultHeader.push_back(HeaderEntry("XTENSION", "'BINTABLE' ", "binary table extension"));
692 _defaultHeader.push_back(HeaderEntry("BITPIX", 8, "8-bit bytes"));
693 _defaultHeader.push_back(HeaderEntry("NAXIS", 2, "2-dimensional binary table"));
694 _defaultHeader.push_back(HeaderEntry("NAXIS1", _rowWidth, "width of table in bytes"));
695 _defaultHeader.push_back(HeaderEntry("NAXIS2", numTiles, "num of rows in table"));
696 _defaultHeader.push_back(HeaderEntry("PCOUNT", 0, "size of special data area"));
697 _defaultHeader.push_back(HeaderEntry("GCOUNT", 1, "one data group (required keyword)"));
698 _defaultHeader.push_back(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
699 _defaultHeader.push_back(HeaderEntry("CHECKSUM", "'0000000000000000' ", "Checksum for the whole HDU"));
700 _defaultHeader.push_back(HeaderEntry("DATASUM", " 0", "Checksum for the data block"));
701 //compression stuff
702 _defaultHeader.push_back(HeaderEntry("ZTABLE", "T", "Table is compressed"));
703 _defaultHeader.push_back(HeaderEntry("ZNAXIS1", 0, "Width of uncompressed rows"));
704 _defaultHeader.push_back(HeaderEntry("ZNAXIS2", 0, "Number of uncompressed rows"));
705 _defaultHeader.push_back(HeaderEntry("ZPCOUNT", 0, ""));
706 _defaultHeader.push_back(HeaderEntry("ZHEAPPTR", 0, ""));
707 _defaultHeader.push_back(HeaderEntry("ZTILELEN", numRowsPerTile, "Number of rows per tile"));
708 _defaultHeader.push_back(HeaderEntry("THEAP", 0, ""));
709
710 pthread_mutex_init(&_mutex, NULL);
711}
712
713/****************************************************************
714 * DEFAULT DESTRUCTOR
715 ****************************************************************/
716CompressedFitsWriter::~CompressedFitsWriter()
717{
718 pthread_mutex_destroy(&_mutex);
719}
720
721/****************************************************************
722 * SET THE POINTER TO THE DRS CALIBRATION
723 ****************************************************************/
724void CompressedFitsWriter::setDrsCalib(int16_t* data)
725{
726 _drsCalibData = data;
727}
728
729/****************************************************************
730 * SET NUM WORKING THREADS
731 ****************************************************************/
732bool CompressedFitsWriter::setNumWorkingThreads(uint32_t num)
733{
734 if (_file.is_open())
735 return false;
736 if (num < 1 || num > 64)
737 {
738 cout << "ERROR: num threads must be between 1 and 64. Ignoring" << endl;
739 return false;
740 }
741 _numThreads = num;
742 _transposedBuffer[0] = NULL;
743 _compressedBuffer[0] = NULL;
744 _threadStatus.resize(num);
745 _thread.resize(num);
746 _threadNumRows.resize(num);
747 for (uint32_t i=0;i<num;i++)
748 {
749 _threadNumRows[i] = 0;
750 _threadStatus[i] = _THREAD_WAIT_;
751 }
752 return reallocateBuffers();
753}
754
755/****************************************************************
756 * WRITE DRS CALIBRATION TO FILE
757 ****************************************************************/
758void CompressedFitsWriter::writeDrsCalib()
759{
760 //if file was not loaded, ignore
761 if (_drsCalibData == NULL)
762 return;
763 uint64_t whereDidIStart = _file.tellp();
764 vector<HeaderEntry> header;
765 header.push_back(HeaderEntry("XTENSION", "'BINTABLE' ", "binary table extension"));
766 header.push_back(HeaderEntry("BITPIX" , 8 , "8-bit bytes"));
767 header.push_back(HeaderEntry("NAXIS" , 2 , "2-dimensional binary table"));
768 header.push_back(HeaderEntry("NAXIS1" , 1024*1440*2 , "width of table in bytes"));
769 header.push_back(HeaderEntry("NAXIS2" , 1 , "number of rows in table"));
770 header.push_back(HeaderEntry("PCOUNT" , 0 , "size of special data area"));
771 header.push_back(HeaderEntry("GCOUNT" , 1 , "one data group (required keyword)"));
772 header.push_back(HeaderEntry("TFIELDS" , 1 , "number of fields in each row"));
773 header.push_back(HeaderEntry("CHECKSUM", "'0000000000000000' ", "Checksum for the whole HDU"));
774 header.push_back(HeaderEntry("DATASUM" , " 0" , "Checksum for the data block"));
775 header.push_back(HeaderEntry("EXTNAME" , "'ZDrsCellOffsets' ", "name of this binary table extension"));
776 header.push_back(HeaderEntry("TTYPE1" , "'OffsetCalibration' ", "label for field 1"));
777 header.push_back(HeaderEntry("TFORM1" , "'1474560I' ", "data format of field: 2-byte INTEGER"));
778
779 for (uint32_t i=0;i<header.size();i++)
780 _file.write(header[i].fitsString().c_str(), 80);
781 //End the header
782 _file.write("END ", 80);
783 long here = _file.tellp();
784 if (here%2880)
785 _file.write(_emptyBlock.c_str(), 2880 - here%2880);
786 //now write the data itself
787 int16_t* swappedBytes = new int16_t[1024];
788 Checksum checksum;
789 for (int32_t i=0;i<1440;i++)
790 {
791 memcpy(swappedBytes, &(_drsCalibData[i*1024]), 2048);
792 for (int32_t j=0;j<2048;j+=2)
793 {
794 int8_t inter;
795 inter = reinterpret_cast<int8_t*>(swappedBytes)[j];
796 reinterpret_cast<int8_t*>(swappedBytes)[j] = reinterpret_cast<int8_t*>(swappedBytes)[j+1];
797 reinterpret_cast<int8_t*>(swappedBytes)[j+1] = inter;
798 }
799 _file.write(reinterpret_cast<char*>(swappedBytes), 2048);
800 checksum.add(reinterpret_cast<char*>(swappedBytes), 2048);
801 }
802 uint64_t whereDidIStop = _file.tellp();
803 delete[] swappedBytes;
804 //No need to pad the data, as (1440*1024*2)%2880==0
805
806 //calculate the checksum from the header
807 ostringstream str;
808 str << checksum.val();
809 header[9] = HeaderEntry("DATASUM", str.str(), "Checksum for the data block");
810 for (vector<HeaderEntry>::iterator it=header.begin();it!=header.end(); it++)
811 checksum.add(it->fitsString().c_str(), 80);
812 string end("END ");
813 string space(" ");
814 checksum.add(end.c_str(), 80);
815 int headerRowsLeft = 36 - (header.size() + 1)%36;
816 for (int i=0;i<headerRowsLeft;i++)
817 checksum.add(space.c_str(), 80);
818 //udpate the checksum keyword
819 header[8] = HeaderEntry("CHECKSUM", checksum.str(), "Checksum for the whole HDU");
820 //and eventually re-write the header data
821 _file.seekp(whereDidIStart);
822 for (uint32_t i=0;i<header.size();i++)
823 _file.write(header[i].fitsString().c_str(), 80);
824 _file.seekp(whereDidIStop);
825}
826
827/****************************************************************
828 * ADD COLUMN
829 ****************************************************************/
830bool CompressedFitsWriter::addColumn(const ColumnEntry& column)
831{
832 if (_totalNumRows != 0)
833 {
834 cout << "Error: cannot add new columns once first row has been written" << endl;
835 return false;
836 }
837 for (vector<ColumnEntry>::iterator it=_columns.begin(); it != _columns.end(); it++)
838 {
839 if (it->name() == column.name())
840 {
841 cout << "Warning: column already exist (" << column.name() << "). Ignoring" << endl;
842 return false;
843 }
844 }
845 _columns.push_back(column);
846 _columns.back().setOffset(_rowWidth);
847 _rowWidth += column.width();
848 reallocateBuffers();
849
850 ostringstream str, str2, str3;
851 str << "TTYPE" << _columns.size();
852 str2 << column.name();
853 str3 << "label for field ";
854 if (_columns.size() < 10) str3 << " ";
855 if (_columns.size() < 100) str3 << " ";
856 str3 << _columns.size();
857 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
858 str.str("");
859 str2.str("");
860 str3.str("");
861 str << "TFORM" << _columns.size();
862 str2 << "1QB";
863 str3 << "data format of field " << _columns.size();
864 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
865 str.str("");
866 str2.str("");
867 str3.str("");
868 str << "ZFORM" << _columns.size();
869 str2 << column.numElems() << column.type();
870 str3 << "Original format of field " << _columns.size();
871 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
872 str.str("");
873 str2.str("");
874 str3.str("");
875 str << "ZCTYP" << _columns.size();
876 str2 << column.getCompressionString();
877 str3 << "Comp. Scheme of field " << _columns.size();
878 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
879 //resize the catalog vector accordingly
880 for (uint32_t i=0;i<_numTiles;i++)
881 {
882 _catalog[i].resize(_columns.size());
883 for (uint32_t j=0;j<_catalog[i].size();j++)
884 _catalog[i][j] = make_pair(0,0);
885 }
886 return true;
887}
888
889/****************************************************************
890 * SET HEADER KEY
891 ****************************************************************/
892bool CompressedFitsWriter::setHeaderKey(const HeaderEntry& entry)
893{
894 HeaderEntry ent = entry;
895 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
896 {
897 if (it->key() == entry.key())
898 {
899 if (entry.comment() == "")
900 ent.setComment(it->comment());
901 (*it) = ent;
902 _headerFlushed = false;
903 return true;
904 }
905 }
906 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
907 {
908 if (it->key() == entry.key())
909 {
910 if (entry.comment() == "")
911 ent.setComment(it->comment());
912 (*it) = ent;
913 _headerFlushed = false;
914 return true;
915 }
916 }
917 if (_totalNumRows != 0)
918 {
919 cout << "Error: new header keys (" << entry.key() << ") must be set before the first row is written. Ignoring." << endl;
920 return false;
921 }
922 _header.push_back(entry);
923 _headerFlushed = false;
924 return true;
925}
926
927/****************************************************************
928 * OPEN
929 ****************************************************************/
930bool CompressedFitsWriter::open(const string& fileName, const string& tableName)
931{
932 _file.open(fileName.c_str(), ios_base::out);
933 if (!_file.is_open())
934 {
935 cout << "Error: Could not open the file (" << fileName << ")." << endl;
936 return false;
937 }
938 _defaultHeader.push_back(HeaderEntry("EXTNAME", tableName, "name of this binary table extension"));
939 _headerFlushed = false;
940 _threadIndex = 0;
941 //create requested number of threads
942 for (uint32_t i=0;i<_numThreads;i++)
943 pthread_create(&(_thread[i]), NULL, threadFunction, this);
944 //wait for the threads to start
945 while (_numThreads != _threadIndex)
946 usleep(1000);
947 //set the writing fence to the last thread
948 _threadIndex = _numThreads-1;
949 return (_file.good());
950}
951
952/****************************************************************
953 * WRITE HEADER
954 ****************************************************************/
955void CompressedFitsWriter::writeHeader(bool closingFile)
956{
957 if (_headerFlushed)
958 return;
959 if (!_file.is_open())
960 return;
961
962 long cPos = _file.tellp();
963
964 _file.seekp(0);
965
966 _file.write(_fitsHeader.c_str(), 2880);
967
968 //Write the DRS calib table here !
969 writeDrsCalib();
970
971 //we are now at the beginning of the main table. Write its header
972 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
973 _file.write(it->fitsString().c_str(), 80);
974
975 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
976 _file.write(it->fitsString().c_str(), 80);
977
978 _file.write("END ", 80);
979 long here = _file.tellp();
980 if (here%2880)
981 _file.write(_emptyBlock.c_str(), 2880 - here%2880);
982
983 _headerFlushed = true;
984
985 here = _file.tellp();
986
987 if (here%2880)
988 cout << "Error: seems that header did not finish at the end of a block." << endl;
989
990 if (here > cPos && cPos != 0)
991 {
992 cout << "Error, entries were added after the first row was written. This is not supposed to happen." << endl;
993 return;
994 }
995
996 here = _file.tellp();
997 writeCatalog(closingFile);
998
999 here = _file.tellp() - here;
1000 _heapPtr = here;
1001
1002 if (cPos != 0)
1003 _file.seekp(cPos);
1004}
1005
1006/****************************************************************
1007 * WRITE CATALOG
1008 * WARNING: writeCatalog is only meant to be used by writeHeader.
1009 * external usage will most likely corrupt the file
1010 ****************************************************************/
1011void CompressedFitsWriter::writeCatalog(bool closingFile)
1012{
1013 uint32_t sizeWritten = 0;
1014 for (uint32_t i=0;i<_catalog.size();i++)
1015 {
1016 for (uint32_t j=0;j<_catalog[i].size();j++)
1017 {
1018 //swap the bytes
1019 int8_t swappedEntry[16];
1020 swappedEntry[0] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[7];
1021 swappedEntry[1] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[6];
1022 swappedEntry[2] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[5];
1023 swappedEntry[3] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[4];
1024 swappedEntry[4] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[3];
1025 swappedEntry[5] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[2];
1026 swappedEntry[6] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[1];
1027 swappedEntry[7] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[0];
1028
1029 swappedEntry[8] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[7];
1030 swappedEntry[9] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[6];
1031 swappedEntry[10] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[5];
1032 swappedEntry[11] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[4];
1033 swappedEntry[12] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[3];
1034 swappedEntry[13] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[2];
1035 swappedEntry[14] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[1];
1036 swappedEntry[15] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[0];
1037 if (closingFile)
1038 {
1039 _checksum.add(reinterpret_cast<char*>(swappedEntry), 16);
1040 }
1041 _file.write(reinterpret_cast<char*>(&swappedEntry[0]), 2*sizeof(int64_t));
1042 sizeWritten += 2*sizeof(int64_t);
1043 }
1044 }
1045
1046 //we do not reserve space for now because fverify does not like that.
1047 //TODO bug should be fixed in the new version. Install it on the cluster and restor space reservation
1048 return ;
1049
1050 //write the padding so that the HEAP section starts at a 2880 bytes boundary
1051 if (sizeWritten % 2880 != 0)
1052 {
1053 vector<char> nullVec(2880 - sizeWritten%2880, 0);
1054 _file.write(nullVec.data(), 2880 - sizeWritten%2880);
1055 }
1056}
1057
1058/****************************************************************
1059 * ADD HEADER CHECKSUM
1060 ****************************************************************/
1061void CompressedFitsWriter::addHeaderChecksum(Checksum& checksum)
1062{
1063 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin();it!=_defaultHeader.end(); it++)
1064 _checksum.add(it->fitsString().c_str(), 80);
1065 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
1066 _checksum.add(it->fitsString().c_str(), 80);
1067 string end("END ");
1068 string space(" ");
1069 checksum.add(end.c_str(), 80);
1070 int headerRowsLeft = 36 - (_defaultHeader.size() + _header.size() + 1)%36;
1071 for (int i=0;i<headerRowsLeft;i++)
1072 checksum.add(space.c_str(), 80);
1073}
1074
1075/****************************************************************
1076 * CLOSE
1077 ****************************************************************/
1078bool CompressedFitsWriter::close()
1079{
1080 for (uint32_t i=0;i<_numThreads;i++)
1081 while (_threadStatus[i] != _THREAD_WAIT_)
1082 usleep(100000);
1083 for (uint32_t i=0;i<_numThreads;i++)
1084 _threadStatus[i] = _THREAD_EXIT_;
1085 for (uint32_t i=0;i<_numThreads;i++)
1086 pthread_join(_thread[i], NULL);
1087 //flush the rows that were not written yet
1088 if (_totalNumRows%_numRowsPerTile != 0)
1089 {
1090 copyTransposeTile(0);
1091
1092 _threadNumRows[0] = _totalNumRows;
1093 uint32_t numBytes = compressBuffer(0);
1094 writeCompressedDataToDisk(0, numBytes);
1095 }
1096 //compression stuff
1097 setHeaderKey(HeaderEntry("ZNAXIS1", _rowWidth, "Width of uncompressed rows"));
1098 setHeaderKey(HeaderEntry("ZNAXIS2", _totalNumRows, "Number of uncompressed rows"));
1099 //TODO calculate the real offset from the main table to the start of the HEAP data area
1100 setHeaderKey(HeaderEntry("ZHEAPPTR", _heapPtr, ""));
1101 setHeaderKey(HeaderEntry("THEAP", _heapPtr, ""));
1102
1103 //regular stuff
1104 if (_catalog.size() > 0)
1105 {
1106 setHeaderKey(HeaderEntry("NAXIS1", 2*sizeof(int64_t)*_catalog[0].size(), "width of table in bytes"));
1107 setHeaderKey(HeaderEntry("NAXIS2", _numTiles, ""));
1108 setHeaderKey(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
1109 int64_t heapSize = 0;
1110 int64_t compressedOffset = 0;
1111 for (uint32_t i=0;i<_catalog.size();i++)
1112 {
1113 compressedOffset += sizeof(TileHeader);
1114 heapSize += sizeof(TileHeader);
1115 for (uint32_t j=0;j<_catalog[i].size();j++)
1116 {
1117 heapSize += _catalog[i][j].first;
1118 //set the catalog offsets to their actual values
1119 _catalog[i][j].second = compressedOffset;
1120 compressedOffset += _catalog[i][j].first;
1121 //special case if entry has zero length
1122 if (_catalog[i][j].first == 0) _catalog[i][j].second = 0;
1123 }
1124 }
1125 setHeaderKey(HeaderEntry("PCOUNT", heapSize, "size of special data area"));
1126 }
1127 writeHeader(true);
1128 ostringstream str;
1129 str << _checksum.val();
1130 setHeaderKey(HeaderEntry("DATASUM", str.str(), ""));
1131 addHeaderChecksum(_checksum);
1132 setHeaderKey(HeaderEntry("CHECKSUM", _checksum.str(), ""));
1133 //update header value
1134 writeHeader();
1135 //update file length
1136 long here = _file.tellp();
1137 if (here%2880)
1138 {
1139 vector<char> nullVec(2880 - here%2880, 0);
1140 _file.write(nullVec.data(), 2880 - here%2880);
1141 }
1142 _file.close();
1143 return true;
1144}
1145
1146/****************************************************************
1147 * COPY TRANSPOSE TILE
1148 ****************************************************************/
1149void CompressedFitsWriter::copyTransposeTile(uint32_t index)
1150{
1151 uint32_t thisRoundNumRows = (_totalNumRows%_numRowsPerTile) ? _totalNumRows%_numRowsPerTile : _numRowsPerTile;
1152 //copy the tile and transpose it
1153 uint32_t offset = 0;
1154 for (uint32_t i=0;i<_columns.size();i++)
1155 {
1156 switch (_columns[i].getColumnOrdering())//getCompression())
1157 {
1158 case FACT_ROW_MAJOR:
1159 for (uint32_t k=0;k<thisRoundNumRows;k++)
1160 {//regular, "semi-transposed" copy
1161 memcpy(&(_transposedBuffer[index][offset]), &_buffer[k*_rowWidth + _columns[i].offset()], _columns[i].sizeOfElems()*_columns[i].numElems());
1162 offset += _columns[i].sizeOfElems()*_columns[i].numElems();
1163 }
1164 break;
1165
1166 case FACT_COL_MAJOR :
1167 for (int j=0;j<_columns[i].numElems();j++)
1168 for (uint32_t k=0;k<thisRoundNumRows;k++)
1169 {//transposed copy
1170 memcpy(&(_transposedBuffer[index][offset]), &_buffer[k*_rowWidth + _columns[i].offset() + _columns[i].sizeOfElems()*j], _columns[i].sizeOfElems());
1171 offset += _columns[i].sizeOfElems();
1172 }
1173 break;
1174 default:
1175 cout << "Error: unknown column ordering: " << _columns[i].getColumnOrdering() << endl;
1176
1177 };
1178 }
1179}
1180
1181/****************************************************************
1182 * WRITE BINARY ROW
1183 ****************************************************************/
1184bool CompressedFitsWriter::writeBinaryRow(const char* bufferToWrite)
1185{
1186 if (_totalNumRows == 0)
1187 writeHeader();
1188
1189 memcpy(&_buffer[_rowWidth*(_totalNumRows%_numRowsPerTile)], bufferToWrite, _rowWidth);
1190 _totalNumRows++;
1191 if (_totalNumRows%_numRowsPerTile == 0)
1192 {
1193 //which is the next thread that we should use ?
1194 while (_threadStatus[_threadLooper] == _THREAD_COMPRESS_)
1195 usleep(100000);
1196
1197 copyTransposeTile(_threadLooper);
1198
1199 while (_threadStatus[_threadLooper] != _THREAD_WAIT_)
1200 usleep(100000);
1201
1202 _threadNumRows[_threadLooper] = _totalNumRows;
1203 _threadStatus[_threadLooper] = _THREAD_COMPRESS_;
1204 _threadLooper = (_threadLooper+1)%_numThreads;
1205 }
1206 return _file.good();
1207}
1208
1209/****************************************************************
1210 * COMPRESS BUFFER
1211 ****************************************************************/
1212uint32_t CompressedFitsWriter::compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1213{
1214 memcpy(dest, src, numRows*sizeOfElems*numRowElems);
1215 return numRows*sizeOfElems*numRowElems;
1216}
1217
1218/****************************************************************
1219 * COMPRESS BUFFER
1220 ****************************************************************/
1221uint32_t CompressedFitsWriter::compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1222{
1223 string huffmanOutput;
1224 uint32_t previousHuffmanSize = 0;
1225 if (numRows < 2)
1226 {//if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
1227 return numRows*sizeOfElems*numRowElems + 1000;
1228 }
1229 if (sizeOfElems < 2 )
1230 {
1231 cout << "Fatal ERROR: HUFMANN can only encode short or longer types" << endl;
1232 return 0;
1233 }
1234 uint32_t huffmanOffset = 0;
1235 for (uint32_t j=0;j<numRowElems;j++)
1236 {
1237 Huffman::Encode(huffmanOutput,
1238 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
1239 numRows*(sizeOfElems/2));
1240 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
1241 huffmanOffset += sizeof(uint32_t);
1242 previousHuffmanSize = huffmanOutput.size();
1243 }
1244 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
1245
1246 //only copy if not larger than not-compressed size
1247 if (totalSize < numRows*sizeOfElems*numRowElems)
1248 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
1249
1250 return totalSize;
1251}
1252
1253/****************************************************************
1254 * COMPRESS BUFFER
1255 ****************************************************************/
1256uint32_t CompressedFitsWriter::compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1257{
1258 uint32_t colWidth = numRowElems;
1259 for (int j=colWidth*numRows-1;j>1;j--)
1260 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
1261 //call the huffman transposed
1262 return compressHUFFMAN(dest, src, numRowElems, sizeOfElems, numRows);
1263}
1264
1265uint32_t CompressedFitsWriter::applySMOOTHING(char* , char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1266{
1267 uint32_t colWidth = numRowElems;
1268 for (int j=colWidth*numRows-1;j>1;j--)
1269 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
1270
1271 return numRows*sizeOfElems*numRowElems;
1272}
1273/****************************************************************
1274 * COMPRESS BUFFER
1275 ****************************************************************/
1276uint64_t CompressedFitsWriter::compressBuffer(uint32_t threadIndex)
1277{
1278 uint32_t thisRoundNumRows = (_threadNumRows[threadIndex]%_numRowsPerTile) ? _threadNumRows[threadIndex]%_numRowsPerTile : _numRowsPerTile;
1279 uint32_t offset=0;
1280 uint32_t currentCatalogRow = (_threadNumRows[threadIndex]-1)/_numRowsPerTile;
1281 uint64_t compressedOffset = sizeof(TileHeader); //skip the 'TILE' marker and tile size entry
1282
1283 //now compress each column one by one by calling compression on arrays
1284 for (uint32_t i=0;i<_columns.size();i++)
1285 {
1286 _catalog[currentCatalogRow][i].second = compressedOffset;
1287
1288 if (_columns[i].numElems() == 0) continue;
1289
1290 BlockHeader& head = _columns[i].getBlockHeader();
1291 const vector<uint16_t>& sequence = _columns[i].getCompressionSequence();
1292 //set the default byte telling if uncompressed the compressed Flag
1293 uint64_t previousOffset = compressedOffset;
1294 //skip header data
1295 compressedOffset += sizeof(BlockHeader) + sizeof(uint16_t)*sequence.size();
1296
1297 for (uint32_t j=0;j<sequence.size(); j++)
1298 {
1299 switch (sequence[j])
1300 {
1301 case FACT_RAW:
1302 if (head.numProcs == 1)
1303 compressedOffset += compressUNCOMPRESSED(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1304 break;
1305 case FACT_SMOOTHING:
1306 applySMOOTHING(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1307 break;
1308 case FACT_HUFFMAN16:
1309 if (head.ordering == FACT_COL_MAJOR)
1310 compressedOffset += compressHUFFMAN(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1311 else
1312 compressedOffset += compressHUFFMAN(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), _columns[i].numElems(), _columns[i].sizeOfElems(), thisRoundNumRows);
1313 break;
1314 default:
1315 cout << "ERROR: Unkown compression sequence entry: " << sequence[i] << endl;
1316 break;
1317 }
1318 }
1319
1320 //check if compressed size is larger than uncompressed
1321 if (sequence[0] != FACT_RAW &&
1322 compressedOffset - previousOffset > _columns[i].sizeOfElems()*_columns[i].numElems()*thisRoundNumRows+sizeof(BlockHeader)+sizeof(uint16_t)*sequence.size())
1323 {//if so set flag and redo it uncompressed
1324 cout << "REDOING UNCOMPRESSED" << endl;
1325 compressedOffset = previousOffset + sizeof(BlockHeader) + 1;
1326 compressedOffset += compressUNCOMPRESSED(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1327 BlockHeader he;
1328 he.size = compressedOffset - previousOffset;
1329 he.numProcs = 1;
1330 he.ordering = FACT_ROW_MAJOR;
1331 memcpy(&(_compressedBuffer[threadIndex][previousOffset]), (char*)(&he), sizeof(BlockHeader));
1332 _compressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)] = FACT_RAW;
1333 offset += thisRoundNumRows*_columns[i].sizeOfElems()*_columns[i].numElems();
1334 _catalog[currentCatalogRow][i].first = compressedOffset - _catalog[currentCatalogRow][i].second;
1335 continue;
1336 }
1337 head.size = compressedOffset - previousOffset;
1338 memcpy(&(_compressedBuffer[threadIndex][previousOffset]), (char*)(&head), sizeof(BlockHeader));
1339 memcpy(&(_compressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)]), sequence.data(), sizeof(uint16_t)*sequence.size());
1340
1341 offset += thisRoundNumRows*_columns[i].sizeOfElems()*_columns[i].numElems();
1342 _catalog[currentCatalogRow][i].first = compressedOffset - _catalog[currentCatalogRow][i].second;
1343 }
1344
1345 TileHeader tHead(thisRoundNumRows, compressedOffset);
1346 memcpy(_compressedBuffer[threadIndex], &tHead, sizeof(TileHeader));
1347 return compressedOffset;
1348}
1349
1350/****************************************************************
1351 * WRITE COMPRESS DATA TO DISK
1352 ****************************************************************/
1353bool CompressedFitsWriter::writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite)
1354{
1355 char* checkSumPointer = _compressedBuffer[threadID];
1356 int32_t extraBytes = 0;
1357 uint32_t sizeToChecksum = sizeToWrite;
1358 if (_checkOffset != 0)
1359 {//should we extend the array to the left ?
1360 sizeToChecksum += _checkOffset;
1361 checkSumPointer -= _checkOffset;
1362 memset(checkSumPointer, 0, _checkOffset);
1363 }
1364 if (sizeToChecksum%4 != 0)
1365 {//should we extend the array to the right ?
1366 extraBytes = 4 - (sizeToChecksum%4);
1367 memset(checkSumPointer+sizeToChecksum, 0,extraBytes);
1368 sizeToChecksum += extraBytes;
1369 }
1370 //do the checksum
1371 _checksum.add(checkSumPointer, sizeToChecksum);
1372 _checkOffset = (4 - extraBytes)%4;
1373 //write data to disk
1374 _file.write(_compressedBuffer[threadID], sizeToWrite);
1375 return _file.good();
1376}
1377
1378/****************************************************************
1379 * WRITER THREAD LOOP
1380 ****************************************************************/
1381void* CompressedFitsWriter::threadFunction(void* context)
1382{
1383 CompressedFitsWriter* myself =static_cast<CompressedFitsWriter*>(context);
1384
1385 uint32_t myID = 0;
1386 pthread_mutex_lock(&(myself->_mutex));
1387 myID = myself->_threadIndex++;
1388 pthread_mutex_unlock(&(myself->_mutex));
1389 uint32_t threadToWaitForBeforeWriting = (myID == 0) ? myself->_numThreads-1 : myID-1;
1390
1391 while (myself->_threadStatus[myID] != _THREAD_EXIT_)
1392 {
1393 while (myself->_threadStatus[myID] == _THREAD_WAIT_)
1394 usleep(100000);
1395 if (myself->_threadStatus[myID] != _THREAD_COMPRESS_)
1396 continue;
1397 uint32_t numBytes = myself->compressBuffer(myID);
1398 myself->_threadStatus[myID] = _THREAD_WRITE_;
1399
1400 //wait for the previous data to be written
1401 while (myself->_threadIndex != threadToWaitForBeforeWriting)
1402 usleep(1000);
1403 //do the actual writing to disk
1404 pthread_mutex_lock(&(myself->_mutex));
1405 myself->writeCompressedDataToDisk(myID, numBytes);
1406 myself->_threadIndex = myID;
1407 pthread_mutex_unlock(&(myself->_mutex));
1408 myself->_threadStatus[myID] = _THREAD_WAIT_;
1409 }
1410 return NULL;
1411}
1412
1413/****************************************************************
1414 * PRINT USAGE
1415 ****************************************************************/
1416void printUsage()
1417{
1418 cout << endl;
1419 cout << "The FACT-Fits compressor reads an input Fits file from FACT"
1420 " and compresses it.\n It can use a drs calibration in order to"
1421 " improve the compression ratio. If so, the input calibration"
1422 " is embedded into the compressed file.\n"
1423 " By default, the Data column will be compressed using SMOOTHMAN (Thomas' algorithm)"
1424 " while other columns will be compressed with the AMPLITUDE coding (Veritas)"
1425 "Usage: Compressed_Fits_Test <inputFile>";
1426 cout << endl;
1427}
1428
1429/****************************************************************
1430 * PRINT HELP
1431 ****************************************************************/
1432void printHelp()
1433{
1434 cout << endl;
1435 cout << "The inputFile is required. It must have fits in its filename and the compressed file will be written in the same folder. "
1436 "The fz extension will be added, replacing the .gz one if required \n"
1437 "If output is specified, then it will replace the automatically generated output filename\n"
1438 "If --drs, followed by a drs calib then it will be applied to the data before compressing\n"
1439 "rowPerTile can be used to tune how many rows are in each tile. Default is 100\n"
1440 "threads gives the number of threads to use. Cannot be less than the default (1)\n"
1441 "compression explicitely gives the compression scheme to use for a given column. The syntax is:\n"
1442 "<ColumnName>=<CompressionScheme> with <CompressionScheme> one of the following:\n"
1443 "UNCOMPRESSED\n"
1444 "AMPLITUDE\n"
1445 "HUFFMAN\n"
1446 "SMOOTHMAN\n"
1447 "INT_WAVELET\n"
1448 "\n"
1449 "--quiet removes any textual output, except error messages\n"
1450 "--verify makes the compressor check the compressed data. It will read it back, and compare the reconstructed CHECKSUM and DATASUM with the original file values."
1451 ;
1452 cout << endl << endl;
1453}
1454
1455/****************************************************************
1456 * SETUP CONFIGURATION
1457 ****************************************************************/
1458void setupConfiguration(Configuration& conf)
1459{
1460 po::options_description configs("FitsCompressor options");
1461 configs.add_options()
1462 ("inputFile,i", vars<string>(), "Input file")
1463 ("drs,d", var<string>(), "Input drs calibration file")
1464 ("rowPerTile,r", var<unsigned int>(), "Number of rows per tile. Default is 100")
1465 ("output,o", var<string>(), "Output file. If empty, .fz is appened to the original name")
1466 ("threads,t", var<unsigned int>(), "Number of threads to use for compression")
1467 ("compression,c", vars<string>(), "which compression to use for which column. Syntax <colName>=<compressionScheme>")
1468 ("quiet,q", po_switch(), "Should the program display any text at all ?")
1469 ("verify,v", po_switch(), "Should we verify the data that has been compressed ?")
1470 ;
1471 po::positional_options_description positional;
1472 positional.add("inputFile", -1);
1473 conf.AddOptions(configs);
1474 conf.SetArgumentPositions(positional);
1475}
1476
1477/****************************************************************
1478 * MAIN
1479 ****************************************************************/
1480int main(int argc, const char** argv)
1481{
1482 Configuration conf(argv[0]);
1483 conf.SetPrintUsage(printUsage);
1484 setupConfiguration(conf);
1485
1486 if (!conf.DoParse(argc, argv, printHelp))
1487 return -1;
1488
1489 //initialize the file names to nothing.
1490 string fileNameIn = "";
1491 string fileNameOut = "";
1492 string drsFileName = "";
1493 uint32_t numRowsPerTile = 100;
1494 bool displayText=true;
1495
1496 //parse configuration
1497 if (conf.Get<bool>("quiet")) displayText = false;
1498 const vector<string> inputFileNameVec = conf.Vec<string>("inputFile");
1499 if (inputFileNameVec.size() != 1)
1500 {
1501 cout << "Error: ";
1502 if (inputFileNameVec.size() == 0) cout << "no";
1503 else cout << inputFileNameVec.size();
1504 cout << " input file(s) given. Expected one. Aborting. Input:" << endl;;
1505 for (unsigned int i=0;i<inputFileNameVec.size(); i++)
1506 cout << inputFileNameVec[i] << endl;
1507 return -1;
1508 }
1509
1510 //Assign the input filename
1511 fileNameIn = inputFileNameVec[0];
1512
1513 //Check if we have a drs calib too
1514 if (conf.Has("drs")) drsFileName = conf.Get<string>("drs");
1515
1516 //Should we verify the data ?
1517 bool verifyDataPlease = false;
1518 if (conf.Has("verify")) verifyDataPlease = conf.Get<bool>("verify");
1519
1520
1521 //should we use a specific output filename ?
1522 if (conf.Has("output"))
1523 fileNameOut = conf.Get<string>("output");
1524 else
1525 {
1526 size_t pos = fileNameIn.find(".fits");
1527 if (pos == string::npos)
1528 {
1529 cout << "ERROR: input file does not seems ot be fits. Aborting." << endl;
1530 return -1;
1531 }
1532 fileNameOut = fileNameIn.substr(0, pos) + ".fz";
1533 }
1534
1535 //should we use specific compression on some columns ?
1536 const vector<string> columnsCompression = conf.Vec<string>("compression");
1537
1538 //split up values between column names and compression scheme
1539 vector<std::pair<string, string>> compressions;
1540 for (unsigned int i=0;i<columnsCompression.size();i++)
1541 {
1542 size_t pos = columnsCompression[i].find_first_of("=");
1543 if (pos == string::npos)
1544 {
1545 cout << "ERROR: Something wrong occured while parsing " << columnsCompression[i] << ". Aborting." << endl;
1546 return -1;
1547 }
1548 string comp = columnsCompression[i].substr(pos+1);
1549 if (comp != "UNCOMPRESSED" && comp != "AMPLITUDE" && comp != "HUFFMAN" &&
1550 comp != "SMOOTHMAN" && comp != "INT_WAVELET")
1551 {
1552 cout << "Unkown compression scheme requested (" << comp << "). Aborting." << endl;
1553 return -1;
1554 }
1555 compressions.push_back(make_pair(columnsCompression[i].substr(0, pos), comp));
1556 }
1557
1558 //How many rows per tile should we use ?
1559 if (conf.Has("rowPerTile")) numRowsPerTile = conf.Get<unsigned int>("rowPerTile");
1560
1561 /************************************************************************************
1562 * Done reading configuration. Open relevant files
1563 ************************************************************************************/
1564
1565 //Open input's fits file
1566 factfits inFile(fileNameIn);
1567
1568 if (inFile.IsCompressedFITS())
1569 {
1570 cout << "ERROR: input file is already a compressed fits. Cannot be compressed again: Aborting." << endl;
1571 return -1;
1572 }
1573
1574 //decide how many tiles should be put in the compressed file
1575 uint32_t originalNumRows = inFile.GetNumRows();
1576 uint32_t numTiles = (originalNumRows%numRowsPerTile) ? (originalNumRows/numRowsPerTile)+1 : originalNumRows/numRowsPerTile;
1577 CompressedFitsWriter outFile(numTiles, numRowsPerTile);
1578
1579 //should we use a specific number of threads for compressing ?
1580 unsigned int numThreads = 1;
1581 if (conf.Has("threads"))
1582 {
1583 numThreads = conf.Get<unsigned int>("threads");
1584 outFile.setNumWorkingThreads(numThreads);
1585 }
1586
1587 if (!outFile.open(fileNameOut))
1588 {
1589 cout << "Error: could not open " << fileNameOut << " for writing" << endl;
1590 return -1;
1591 }
1592
1593 //Because the file to open MUST be given by the constructor, I must use a pointer instead
1594 factfits* drsFile = NULL;
1595 //try to open the Drs file. If any.
1596 if (drsFileName != "")
1597 {
1598 try
1599 {
1600 drsFile = new factfits(drsFileName);
1601 }
1602 catch (...)
1603 {
1604 cout << "Error: could not open " << drsFileName << " for calibration" << endl;
1605 return -1;
1606 }
1607 }
1608
1609 if (displayText)
1610 {
1611 cout << endl;
1612 cout << "**********************" << endl;
1613 cout << "Will compress from : " << fileNameIn << endl;
1614 cout << "to : " << fileNameOut << endl;
1615 if (drsFileName != "")
1616 cout << "while calibrating with: " << drsFileName << endl;
1617 cout << "Compression will use : " << numThreads << " worker threads" << endl;
1618 cout << "Data will be verified : ";
1619 if (verifyDataPlease)
1620 cout << "yes" << endl;
1621 else
1622 cout << "no (WARNING !)" << endl;
1623 cout << "**********************" << endl;
1624 cout << endl;
1625 }
1626
1627 /************************************************************************************
1628 * Done opening input files. Allocate memory and configure output file
1629 ************************************************************************************/
1630
1631 //allocate the buffer for temporary storage of each read/written row
1632 uint32_t rowWidth = inFile.GetUInt("NAXIS1");
1633 char* buffer = new char[rowWidth];
1634
1635 //get the source columns
1636 const fits::Table::Columns& columns = inFile.GetColumns();
1637 const fits::Table::SortedColumns& sortedColumns = inFile.GetSortedColumns();
1638 if (displayText)
1639 cout << "Input file has " << columns.size() << " columns and " << inFile.GetNumRows() << " rows" << endl;
1640
1641 //Add columns.
1642 for (uint32_t i=0;i<sortedColumns.size(); i++)
1643 {
1644 //get column name
1645 ostringstream str;
1646 str << "TTYPE" << i+1;
1647 string colName = inFile.GetStr(str.str());
1648 if (displayText)
1649 {
1650 cout << "Column " << colName;
1651 for (uint32_t j=colName.size();j<21;j++)
1652 cout << " ";
1653 cout << " -> ";
1654 }
1655
1656 //get header structures
1657 BlockHeader rawHeader;
1658 BlockHeader smoothmanHeader(0, FACT_ROW_MAJOR, 3);
1659 vector<uint16_t> rawProcessings(1);
1660 rawProcessings[0] = FACT_RAW;
1661 vector<uint16_t> smoothmanProcessings(3);
1662 smoothmanProcessings[0] = FACT_SMOOTHING;
1663 smoothmanProcessings[1] = FACT_HUFFMAN16;
1664 smoothmanProcessings[2] = FACT_RAW;
1665
1666 //first lets see if we do have an explicit request
1667 bool explicitRequest = false;
1668 for (unsigned int j=0;j<compressions.size();j++)
1669 {
1670 if (compressions[j].first == colName)
1671 {
1672 explicitRequest = true;
1673 if (displayText) cout << compressions[j].second << endl;
1674 if (compressions[j].second == "UNCOMPRESSED")
1675 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, rawHeader, rawProcessings));
1676 if (compressions[j].second == "SMOOTHMAN")
1677 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, smoothmanHeader, smoothmanProcessings));
1678 break;
1679 }
1680 }
1681
1682 if (explicitRequest) continue;
1683
1684 if (colName != "Data")
1685 {
1686 if (displayText) cout << "UNCOMPRESSED" << endl;
1687 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, rawHeader, rawProcessings));
1688 }
1689 else
1690 {
1691 if (displayText) cout << "SMOOTHMAN" << endl;
1692 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, smoothmanHeader, smoothmanProcessings));
1693 }
1694 }
1695
1696 //translate original header entries to their Z-version
1697 const fits::Table::Keys& header = inFile.GetKeys();
1698 for (fits::Table::Keys::const_iterator i=header.begin(); i!= header.end(); i++)
1699 {
1700 string k = i->first;//header[i].key();
1701 if (k == "XTENSION" || k == "BITPIX" || k == "PCOUNT" || k == "GCOUNT" || k == "TFIELDS")
1702 continue;
1703 if (k == "CHECKSUM")
1704 {
1705 outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("ZCHKSUM", i->second.value, i->second.comment));
1706 continue;
1707 }
1708 if (k == "DATASUM")
1709 {
1710 outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("ZDTASUM", i->second.value, i->second.comment));
1711 continue;
1712 }
1713 k = k.substr(0,5);
1714 if (k == "TTYPE" || k == "TFORM")
1715 {
1716 string tmpKey = i->second.fitsString;
1717 tmpKey[0] = 'Z';
1718 outFile.setHeaderKey(tmpKey);
1719 continue;
1720 }
1721 if (k == "NAXIS")
1722 continue;
1723 outFile.setHeaderKey(i->second.fitsString);
1724 }
1725
1726 //deal with the DRS calib
1727 int16_t* drsCalib16 = NULL;
1728
1729 //load the drs calib. data
1730 int32_t startCellOffset = -1;
1731 if (drsFileName != "")
1732 {
1733 drsCalib16 = new int16_t[1440*1024];
1734 float* drsCalibFloat = NULL;
1735 try
1736 {
1737 drsCalibFloat = reinterpret_cast<float*>(drsFile->SetPtrAddress("BaselineMean"));
1738 }
1739 catch (...)
1740 {
1741 cout << "Could not find column BaselineMean in drs calibration file " << drsFileName << ". Aborting" << endl;
1742 return -1;
1743 }
1744
1745 //read the calibration and calculate its integer value
1746 drsFile->GetNextRow();
1747 for (uint32_t i=0;i<1440*1024;i++)
1748 drsCalib16[i] = (int16_t)(drsCalibFloat[i]*4096.f/2000.f);
1749
1750 //assign it to the ouput file
1751 outFile.setDrsCalib(drsCalib16);
1752
1753 //get the start cells offsets
1754 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1755 if (it->first == "StartCellData")
1756 {
1757 startCellOffset = it->second.offset;
1758 if (it->second.type != 'I')
1759 {
1760 cout << "Wrong type for the StartCellData Column: " << it->second.type << " instead of I expected"<< endl;
1761 return -1;
1762 }
1763 }
1764
1765 if (startCellOffset == -1)
1766 {
1767 cout << "Could not find StartCellData in input file " << fileNameIn << ". Aborting."<< endl;
1768 return -1;
1769 }
1770 }
1771
1772 /************************************************************************************
1773 * Done configuring compression. Do the real job now !
1774 ************************************************************************************/
1775
1776 if (displayText) cout << "Converting file..." << endl;
1777
1778 int numSlices = -1;
1779 int32_t dataOffset = -1;
1780
1781 //Get the pointer to the column that must be drs-calibrated
1782 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1783 if (it->first == "Data")
1784 {
1785 numSlices = it->second.num;
1786 dataOffset = it->second.offset;
1787 }
1788 if (numSlices % 1440 != 0)
1789 {
1790 cout << "seems like the number of samples is not a multiple of 1440. Aborting." << endl;
1791 return -1;
1792 }
1793 if (dataOffset == -1)
1794 {
1795 cout << "Could not find the column Data in the input file. Aborting." << endl;
1796 return -1;
1797 }
1798
1799 numSlices /= 1440;
1800
1801 //set pointers to the readout data to later be able to gather it to "buffer".
1802 vector<void*> readPointers;
1803 vector<int32_t> readOffsets;
1804 vector<int32_t> readElemSize;
1805 vector<int32_t> readNumElems;
1806 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1807 {
1808 readPointers.push_back(inFile.SetPtrAddress(it->first));
1809 readOffsets.push_back(it->second.offset);
1810 readElemSize.push_back(it->second.size);
1811 readNumElems.push_back(it->second.num);
1812 }
1813
1814 //Convert each row one after the other
1815 for (uint32_t i=0;i<inFile.GetNumRows();i++)
1816 {
1817 if (displayText) cout << "\r Row " << i+1 << flush;
1818 inFile.GetNextRow();
1819 //copy from inFile internal structures to buffer
1820 int32_t count=0;
1821 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end();it++)
1822 {
1823 memcpy(&buffer[readOffsets[count]], readPointers[count], readElemSize[count]*readNumElems[count]);
1824 count++;
1825 }
1826
1827 if (startCellOffset != -1)
1828 {//drs calibrate this data
1829 for (int j=0;j<1440;j++)
1830 {
1831 const int thisStartCell = reinterpret_cast<int16_t*>(&buffer[startCellOffset])[j];
1832 if (thisStartCell < 0) continue;
1833 for (int k=0;k<numSlices;k++)
1834 reinterpret_cast<int16_t*>(&buffer[dataOffset])[numSlices*j + k] -= drsCalib16[1024*j + (thisStartCell+k)%1024];
1835 }
1836 }
1837 outFile.writeBinaryRow(buffer);
1838 };
1839
1840 //Get table name for later use in case the compressed file is to be verified
1841 string tableName = inFile.GetStr("EXTNAME");
1842
1843 if (displayText) cout << endl << "Done. Flushing output file..." << endl;
1844
1845 inFile.close();
1846 if (!outFile.close())
1847 {
1848 cout << "Something went wrong while writing the catalog: negative index" << endl;
1849 return false;
1850 }
1851
1852 delete[] drsCalib16;
1853
1854 if (displayText) cout << "Done." << endl;
1855
1856 /************************************************************************************
1857 * Actual job done. Should we verify what we did ?
1858 ************************************************************************************/
1859
1860 if (verifyDataPlease)
1861 {
1862 if (displayText) cout << "Now verify data..." << endl;
1863 }
1864 else
1865 return 0;
1866
1867 //get a compressed reader
1868//TEMP try to copy the file too
1869// string copyName("/scratch/copyFile.fz");
1870 string copyName("");
1871 factfits verifyFile(fileNameOut, copyName, tableName, false);
1872
1873 //and the header of the compressed file
1874 const fits::Table::Keys& header2 = verifyFile.GetKeys();
1875
1876 //get a non-compressed writer
1877 ofits reconstructedFile;
1878
1879 //figure out its name: /dev/null unless otherwise specified
1880 string reconstructedName = fileNameOut+".recons";
1881 reconstructedName = "/dev/null";
1882 reconstructedFile.open(reconstructedName.c_str(), false);
1883
1884 //reconstruct the original columns from the compressed file.
1885 string origChecksumStr;
1886 string origDatasum;
1887
1888 //reset tablename value so that it is re-read from compressed table's header
1889 tableName = "";
1890
1891 /************************************************************************************
1892 * Reconstruction setup done. Rebuild original header
1893 ************************************************************************************/
1894
1895 //re-tranlate the keys
1896 for (fits::Table::Keys::const_iterator it=header2.begin(); it!= header2.end(); it++)
1897 {
1898 string k = it->first;
1899 if (k == "XTENSION" || k == "BITPIX" || k == "PCOUNT" || k == "GCOUNT" ||
1900 k == "TFIELDS" || k == "ZTABLE" || k == "ZNAXIS1" || k == "ZNAXIS2" ||
1901 k == "ZHEAPPTR" || k == "ZPCOUNT" || k == "ZTILELEN" || k == "THEAP" ||
1902 k == "CHECKSUM" || k == "DATASUM" || k == "FCTCPVER")
1903 {
1904 continue;
1905 }
1906
1907 if (k == "ZCHKSUM")
1908 {
1909 reconstructedFile.SetKeyComment("CHECKSUM", it->second.comment);
1910 origChecksumStr = it->second.value;
1911 continue;
1912 }
1913
1914 if (k == "ZDTASUM")
1915 {
1916 reconstructedFile.SetKeyComment("DATASUM", it->second.comment);
1917 origDatasum = it->second.value;
1918 continue;
1919 }
1920
1921 if (k == "EXTNAME")
1922 {
1923 tableName = it->second.value;
1924 }
1925
1926 k = k.substr(0,5);
1927
1928 if (k == "TTYPE")
1929 {//we have an original column name here.
1930 //manually deal with these in order to preserve the ordering (easier than re-constructing yet another list on the fly)
1931 continue;
1932 }
1933
1934 if (k == "TFORM" || k == "NAXIS" || k == "ZCTYP" )
1935 {
1936 continue;
1937 }
1938
1939 if (k == "ZFORM" || k == "ZTYPE")
1940 {
1941 string tmpKey = it->second.fitsString;
1942 tmpKey[0] = 'T';
1943 reconstructedFile.SetKeyFromFitsString(tmpKey);
1944 continue;
1945 }
1946
1947 reconstructedFile.SetKeyFromFitsString(it->second.fitsString);
1948 }
1949
1950 if (tableName == "")
1951 {
1952 cout << "Error: table name from file " << fileNameOut << " could not be found. Aborting" << endl;
1953 return -1;
1954 }
1955
1956 //Restore the original columns
1957 for (uint32_t numCol=1; numCol<10000; numCol++)
1958 {
1959 ostringstream str;
1960 str << numCol;
1961 if (!verifyFile.HasKey("TTYPE"+str.str())) break;
1962
1963 string ttype = verifyFile.GetStr("TTYPE"+str.str());
1964 string tform = verifyFile.GetStr("ZFORM"+str.str());
1965 char type = tform[tform.size()-1];
1966 string number = tform.substr(0, tform.size()-1);
1967 int numElems = atoi(number.c_str());
1968
1969 if (number == "") numElems=1;
1970
1971 reconstructedFile.AddColumn(numElems, type, ttype, "", "", false);
1972 }
1973
1974 reconstructedFile.WriteTableHeader(tableName.c_str());
1975
1976 /************************************************************************************
1977 * Original header restored. Do the data
1978 ************************************************************************************/
1979
1980 //set pointers to the readout data to later be able to gather it to "buffer".
1981 readPointers.clear();
1982 readOffsets.clear();
1983 readElemSize.clear();
1984 readNumElems.clear();
1985 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1986 {
1987 readPointers.push_back(verifyFile.SetPtrAddress(it->first));
1988 readOffsets.push_back(it->second.offset);
1989 readElemSize.push_back(it->second.size);
1990 readNumElems.push_back(it->second.num);
1991 }
1992
1993 //do the actual reconstruction work
1994 uint32_t i=1;
1995 while (i<=verifyFile.GetNumRows() && verifyFile.GetNextRow())
1996 {
1997 int count=0;
1998 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end();it++)
1999 {
2000 memcpy(&buffer[readOffsets[count]], readPointers[count], readElemSize[count]*readNumElems[count]);
2001 count++;
2002 }
2003 if (displayText) cout << "\r Row " << i << flush;
2004 reconstructedFile.WriteRow(buffer, rowWidth);
2005 i++;
2006 }
2007
2008 if (displayText) cout << endl;
2009
2010 //close reconstruction input and output
2011// Do NOT close the verify file, otherwise data cannot be flushed to copy file
2012// verifyFile.close();
2013 reconstructedFile.close();
2014
2015 //get original and reconstructed checksum and datasum
2016 std::pair<string, int> origChecksum = make_pair(origChecksumStr, atoi(origDatasum.c_str()));
2017 std::pair<string, int> newChecksum = reconstructedFile.GetChecksumData();
2018
2019 //verify that no mistake was made
2020 if (origChecksum.second != newChecksum.second)
2021 {
2022 cout << "ERROR: datasums are NOT identical: " << (uint32_t)(origChecksum.second) << " vs " << (uint32_t)(newChecksum.second) << endl;
2023 return -1;
2024 }
2025 if (origChecksum.first != newChecksum.first)
2026 {
2027 cout << "WARNING: checksums are NOT Identical: " << origChecksum.first << " vs " << newChecksum.first << endl;
2028 }
2029 else
2030 {
2031 if (displayText) cout << "Ok" << endl;
2032 }
2033
2034 delete[] buffer;
2035 return 0;
2036
2037}
2038
Note: See TracBrowser for help on using the repository browser.