source: trunk/FACT++/src/fitsCompressor.cc@ 16841

Last change on this file since 16841 was 16826, checked in by lyard, 11 years ago
added 0 length files handling
File size: 84.8 KB
Line 
1/*
2 * fitsCompressor.cc
3 *
4 * Created on: May 7, 2013
5 * Author: lyard
6 */
7
8
9#include "Configuration.h"
10#include "../externals/factfits.h"
11#include "../externals/ofits.h"
12#include "../externals/checksum.h"
13
14#include <map>
15#include <fstream>
16#include <sstream>
17#include <iostream>
18
19
20using namespace std;
21
22typedef struct TileHeader
23{
24 char id[4];
25 uint32_t numRows;
26 uint64_t size;
27 TileHeader(uint32_t nRows=0,
28 uint64_t s=0) : id({'T', 'I', 'L', 'E'}),
29 numRows(nRows),
30 size(s)
31 { };
32} __attribute__((__packed__)) TileHeader;
33
34typedef struct BlockHeader
35{
36 uint64_t size;
37 char ordering;
38 unsigned char numProcs;
39 BlockHeader(uint64_t s=0,
40 char o=FACT_ROW_MAJOR,
41 unsigned char n=1) : size(s),
42 ordering(o),
43 numProcs(n)
44 {}
45} __attribute__((__packed__)) BlockHeader;
46
47class CompressedFitsFile
48{
49public:
50 class HeaderEntry
51 {
52 public:
53 /**
54 * Default constructor
55 */
56 HeaderEntry(): _key(""),
57 _value(""),
58 _comment(""),
59 _fitsString("")
60 {
61 }
62
63 /**
64 * Regular constructor.
65 * @param key the name of the keyword entry
66 * @param value its value
67 * @param comment an optionnal comment to be placed after the value
68 */
69 template<typename T>
70 HeaderEntry(const string& k,
71 const T& val,
72 const string& comm) : _key(k),
73 _value(""),
74 _comment(comm),
75 _fitsString("")
76 {
77 setValue(val);
78 }
79
80 /**
81 * From fits.h
82 */
83 string Trim(const string &str, char c=' ')
84 {
85 // Trim Both leading and trailing spaces
86 const size_t pstart = str.find_first_not_of(c); // Find the first character position after excluding leading blank spaces
87 const size_t pend = str.find_last_not_of(c); // Find the first character position from reverse af
88
89 // if all spaces or empty return an empty string
90 if (string::npos==pstart || string::npos==pend)
91 return string();
92
93 return str.substr(pstart, pend-pstart+1);
94 }
95 /**
96 * Constructor from the original fits entry
97 */
98 HeaderEntry(const string& line)
99 {
100 _fitsString = line.data();
101
102 //parse the line
103 _key = Trim(line.substr(0,8));
104 //COMMENT and/or HISTORY values
105 if (line.substr(8,2)!= "= ")
106 {
107 _value = "";
108 _comment = Trim(line.substr(10));
109 return;
110 }
111 string next=line.substr(10);
112 const size_t slash = next.find_first_of('/');
113 _value = Trim(Trim(Trim(next.substr(0, slash)), '\''));
114 _comment = Trim(next.substr(slash+1));
115 }
116 /**
117 * Alternative constroctor from the fits entry
118 */
119 HeaderEntry(const vector<char>& lineVec)
120 {
121 HeaderEntry(string(lineVec.data()));
122 }
123 /**
124 * Destructor
125 */
126 virtual ~HeaderEntry(){}
127
128 const string& value() const {return _value;}
129 const string& key() const {return _key;}
130 const string& comment() const {return _comment;}
131 const string& fitsString() const {return _fitsString;}
132
133 /**
134 * Set a keyword value.
135 * @param value The value to be set
136 * @param update whether the value already exist or not. To be modified soon.
137 */
138 template<typename T>
139 void setValue(const T& val)
140 {
141 ostringstream str;
142 str << val;
143 _value = str.str();
144 buildFitsString();
145 };
146 /**
147 * Set the comment for a given entry
148 */
149 void setComment(const string& comm)
150 {
151 _comment = comm;
152 buildFitsString();
153 }
154
155 private:
156 /**
157 * Construct the FITS header string from the key, value and comment
158 */
159 void buildFitsString()
160 {
161 ostringstream str;
162 unsigned int totSize = 0;
163
164 // Tuncate the key if required
165 if (_key.length() > 8)
166 {
167 str << _key.substr(0, 8);
168 totSize += 8;
169 }
170 else
171 {
172 str << _key;
173 totSize += _key.length();
174 }
175
176 // Append space if key is less than 8 chars long
177 for (int i=totSize; i<8;i++)
178 {
179 str << " ";
180 totSize++;
181 }
182
183 // Add separator
184 str << "= ";
185 totSize += 2;
186
187 // Format value
188 if (_value.length() < 20)
189 for (;totSize<30-_value.length();totSize++)
190 str << " ";
191
192 if (_value.length() > 70)
193 {
194 str << _value.substr(0,70);
195 totSize += 70;
196 }
197 else
198 {
199 str << _value;
200 totSize += _value.size();
201 }
202
203 // If there is space remaining, add comment area
204 if (totSize < 77)
205 {
206 str << " / ";
207 totSize += 3;
208 if (totSize < 80)
209 {
210 unsigned int commentSize = 80 - totSize;
211 if (_comment.length() > commentSize)
212 {
213 str << _comment.substr(0,commentSize);
214 totSize += commentSize;
215 }
216 else
217 {
218 str << _comment;
219 totSize += _comment.length();
220 }
221 }
222 }
223
224 // If there is yet some free space, fill up the entry with spaces
225 for (int i=totSize; i<80;i++)
226 str << " ";
227
228 _fitsString = str.str();
229
230 // Check for correct completion
231 if (_fitsString.length() != 80)
232 cout << "Error |" << _fitsString << "| is not of length 80" << endl;
233 }
234
235 string _key; ///< the key (name) of the header entry
236 string _value; ///< the value of the header entry
237 string _comment; ///< the comment associated to the header entry
238 string _fitsString; ///< the string that will be written to the fits file
239 };
240
241 /**
242 * Supported compressions
243 */
244 typedef enum
245 {
246 UNCOMPRESSED,
247 SMOOTHMAN
248 } FitsCompression;
249
250 /**
251 * Columns class
252 */
253 class ColumnEntry
254 {
255 public:
256 /**
257 * Default constructor
258 */
259 ColumnEntry();
260
261 /**
262 * Default destructor
263 */
264 virtual ~ColumnEntry(){}
265
266 /**
267 * Constructor from values
268 * @param n the column name
269 * @param t the column type
270 * @param numof the number of entries in the column
271 * @param comp the compression type for this column
272 */
273 ColumnEntry(const string& n,
274 char t,
275 int numOf,
276 BlockHeader& head,
277 vector<uint16_t>& seq) : _name(n),
278 _num(numOf),
279 _typeSize(0),
280 _offset(0),
281 _type(t),
282 _description(""),
283 _header(head),
284 _compSequence(seq)
285 {
286 switch (t)
287 {
288 case 'L':
289 case 'A':
290 case 'B': _typeSize = 1; break;
291 case 'I': _typeSize = 2; break;
292 case 'J':
293 case 'E': _typeSize = 4; break;
294 case 'K':
295 case 'D': _typeSize = 8; break;
296 default:
297 cout << "Error: typename " << t << " missing in the current implementation" << endl;
298 };
299
300 ostringstream str;
301 str << "data format of field: ";
302
303 switch (t)
304 {
305 case 'L': str << "1-byte BOOL"; break;
306 case 'A': str << "1-byte CHAR"; break;
307 case 'B': str << "BYTE"; break;
308 case 'I': str << "2-byte INTEGER"; break;
309 case 'J': str << "4-byte INTEGER"; break;
310 case 'K': str << "8-byte INTEGER"; break;
311 case 'E': str << "4-byte FLOAT"; break;
312 case 'D': str << "8-byte FLOAT"; break;
313 }
314
315 _description = str.str();
316 }
317
318 const string& name() const { return _name;};
319 int width() const { return _num*_typeSize;};
320 int offset() const { return _offset; };
321 int numElems() const { return _num; };
322 int sizeOfElems() const { return _typeSize;};
323 void setOffset(int off) { _offset = off;};
324 char type() const { return _type;};
325 string getDescription() const { return _description;}
326 BlockHeader& getBlockHeader() { return _header;}
327 const vector<uint16_t>& getCompressionSequence() const { return _compSequence;}
328 const char& getColumnOrdering() const { return _header.ordering;}
329
330
331 string getCompressionString() const
332 {
333 return "FACT";
334 /*
335 ostringstream str;
336 for (uint32_t i=0;i<_compSequence.size();i++)
337 switch (_compSequence[i])
338 {
339 case FACT_RAW: if (str.str().size() == 0) str << "RAW"; break;
340 case FACT_SMOOTHING: str << "SMOOTHING "; break;
341 case FACT_HUFFMAN16: str << "HUFFMAN16 "; break;
342 };
343 return str.str();*/
344 }
345
346 private:
347
348 string _name; ///< name of the column
349 int _num; ///< number of elements contained in one row of this column
350 int _typeSize; ///< the number of bytes taken by one element
351 int _offset; ///< the offset of the column, in bytes, from the beginning of one row
352 char _type; ///< the type of the column, as specified by the fits documentation
353 string _description; ///< a description for the column. It will be placed in the header
354 BlockHeader _header;
355 vector<uint16_t> _compSequence;
356 };
357
358 public:
359 ///@brief default constructor. Assigns a default number of rows and tiles
360 CompressedFitsFile(uint32_t numTiles=100, uint32_t numRowsPerTile=100);
361
362 ///@brief default destructor
363 virtual ~CompressedFitsFile();
364
365 ///@brief get the header of the file
366 vector<HeaderEntry>& getHeaderEntries() { return _header;}
367
368 protected:
369 ///@brief protected function to allocate the intermediate buffers
370 bool reallocateBuffers();
371
372 //FITS related stuff
373 vector<HeaderEntry> _header; ///< Header keys
374 vector<ColumnEntry> _columns; ///< Columns in the file
375 uint32_t _numTiles; ///< Number of tiles (i.e. groups of rows)
376 uint32_t _numRowsPerTile; ///< Number of rows per tile
377 uint32_t _totalNumRows; ///< Total number of raws
378 uint32_t _rowWidth; ///< Total number of bytes in one row
379 bool _headerFlushed; ///< Flag telling whether the header record is synchronized with the data on disk
380 char* _buffer; ///< Memory buffer to store rows while they are not compressed
381 Checksum _checksum; ///< Checksum for asserting the consistency of the data
382 fstream _file; ///< The actual file streamer for accessing disk data
383
384 //compression related stuff
385 typedef pair<int64_t, int64_t> CatalogEntry;
386 typedef vector<CatalogEntry> CatalogRow;
387 typedef vector<CatalogRow> CatalogType;
388 CatalogType _catalog; ///< Catalog, i.e. the main table that points to the compressed data.
389 uint64_t _heapPtr; ///< the address in the file of the heap area
390 vector<char*> _transposedBuffer; ///< Memory buffer to store rows while they are transposed
391 vector<char*> _compressedBuffer; ///< Memory buffer to store rows while they are compressed
392
393 //thread related stuff
394 uint32_t _numThreads; ///< The number of threads that will be used to compress
395 uint32_t _threadIndex; ///< A variable to assign threads indices
396 vector<pthread_t> _thread; ///< The thread handler of the compressor
397 vector<uint32_t> _threadNumRows; ///< Total number of rows for thread to compress
398 vector<uint32_t> _threadStatus; ///< Flag telling whether the buffer to be transposed (and compressed) is full or empty
399
400 //thread states. Not all used, but they do not hurt
401 static const uint32_t _THREAD_WAIT_; ///< Thread doing nothing
402 static const uint32_t _THREAD_COMPRESS_; ///< Thread working, compressing
403 static const uint32_t _THREAD_DECOMPRESS_; ///< Thread working, decompressing
404 static const uint32_t _THREAD_WRITE_; ///< Thread writing data to disk
405 static const uint32_t _THREAD_READ_; ///< Thread reading data from disk
406 static const uint32_t _THREAD_EXIT_; ///< Thread exiting
407
408 static HeaderEntry _dummyHeaderEntry; ///< Dummy entry for returning if requested on is not found
409};
410
411class CompressedFitsWriter : public CompressedFitsFile
412{
413 public:
414 ///@brief Default constructor. 100 tiles of 100 rows each are assigned by default
415 CompressedFitsWriter(uint32_t numTiles=100, uint32_t numRowsPerTile=100);
416
417 ///@brief default destructor
418 virtual ~CompressedFitsWriter();
419
420 ///@brief add one column to the file
421 bool addColumn(const ColumnEntry& column);
422
423 ///@brief sets a given header key
424 bool setHeaderKey(const HeaderEntry&);
425
426 ///@brief open a new fits file
427 bool open(const string& fileName, const string& tableName="Data");
428
429 ///@brief close the opened file
430 bool close();
431
432 ///@brief write one row of data, already placed in bufferToWrite. Does the byte-swapping
433 bool writeBinaryRow(const char* bufferToWrite);
434
435 ///@brief assign a given (already loaded) drs calibration
436 void setDrsCalib(int16_t* data);
437
438 ///@brief set the number of worker threads compressing the data.
439 bool setNumWorkingThreads(uint32_t num);
440
441 private:
442 ///@brief compresses one buffer of data, the one given by threadIndex
443 uint64_t compressBuffer(uint32_t threadIndex);
444
445 ///@brief writes an already compressed buffer to disk
446 bool writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite);
447
448 ///@brief add the header checksum to the datasum
449 void addHeaderChecksum(Checksum& checksum);
450
451 ///@brief write the header. If closingFile is set to true, checksum is calculated
452 void writeHeader(bool closingFile = false);
453
454 ///@brief write the compressed data catalog. If closingFile is set to true, checksum is calculated
455 void writeCatalog(bool closingFile=false);
456
457 /// FIXME this was a bad idea. Move everything to the regular header
458 vector<HeaderEntry> _defaultHeader;
459
460 /// the main function compressing the data
461 static void* threadFunction(void* context);
462
463 /// Write the drs calibration to disk, if any
464 void writeDrsCalib();
465
466 /// Copy and transpose (or semi-transpose) one tile of data
467 void copyTransposeTile(uint32_t index);
468
469 /// Specific compression functions
470 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
471 uint32_t compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
472 uint32_t compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
473 uint32_t applySMOOTHING(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
474
475 int32_t _checkOffset; ///< offset to the data pointer to calculate the checksum
476 int16_t* _drsCalibData; ///< array of the Drs baseline mean
477 int32_t _threadLooper; ///< Which thread will deal with the upcoming bunch of data ?
478 pthread_mutex_t _mutex; ///< mutex for compressing threads
479
480
481 static string _emptyBlock; ///< an empty block to be apened at the end of a file so that its length is a multiple of 2880
482 static string _fitsHeader; ///< the default header to be written in every fits file
483};
484
485const uint32_t CompressedFitsFile::_THREAD_WAIT_ = 0;
486const uint32_t CompressedFitsFile::_THREAD_COMPRESS_ = 1;
487const uint32_t CompressedFitsFile::_THREAD_DECOMPRESS_= 2;
488const uint32_t CompressedFitsFile::_THREAD_WRITE_ = 3;
489const uint32_t CompressedFitsFile::_THREAD_READ_ = 4;
490const uint32_t CompressedFitsFile::_THREAD_EXIT_ = 5;
491
492template<>
493void CompressedFitsFile::HeaderEntry::setValue(const string& v)
494{
495 string val = v;
496 if (val.size() > 2 && val[0] == '\'')
497 {
498 size_t pos = val.find_last_of("'");
499 if (pos != string::npos && pos != 0)
500 val = val.substr(1, pos-1);
501 }
502 ostringstream str;
503
504 str << "'" << val << "'";
505 for (int i=str.str().length(); i<20;i++)
506 str << " ";
507 _value = str.str();
508 buildFitsString();
509}
510
511/**
512 * Default header to be written in all fits files
513 */
514string CompressedFitsWriter::_fitsHeader = "SIMPLE = T / file does conform to FITS standard "
515 "BITPIX = 8 / number of bits per data pixel "
516 "NAXIS = 0 / number of data axes "
517 "EXTEND = T / FITS dataset may contain extensions "
518 "CHECKSUM= '4AcB48bA4AbA45bA' / Checksum for the whole HDU "
519 "DATASUM = ' 0' / Checksum for the data block "
520 "COMMENT FITS (Flexible Image Transport System) format is defined in 'Astronomy"
521 "COMMENT and Astrophysics', volume 376, page 359; bibcode: 2001A&A...376..359H "
522 "END "
523 " "
524 " "
525 " "
526 " "
527 " "
528 " "
529 " "
530 " "
531 " "
532 " "
533 " "
534 " "
535 " "
536 " "
537 " "
538 " "
539 " "
540 " "
541 " "
542 " "
543 " "
544 " "
545 " "
546 " "
547 " "
548 " "
549 " ";
550
551/**
552 * Empty block to be appenned at the end of files, so that the length matches multiple of 2880 bytes
553 *
554 */
555string CompressedFitsWriter::_emptyBlock = " "
556 " "
557 " "
558 " "
559 " "
560 " "
561 " "
562 " "
563 " "
564 " "
565 " "
566 " "
567 " "
568 " "
569 " "
570 " "
571 " "
572 " "
573 " "
574 " "
575 " "
576 " "
577 " "
578 " "
579 " "
580 " "
581 " "
582 " "
583 " "
584 " "
585 " "
586 " "
587 " "
588 " "
589 " "
590 " ";
591
592CompressedFitsFile::HeaderEntry CompressedFitsFile::_dummyHeaderEntry;
593/****************************************************************
594 * SUPER CLASS DEFAULT CONSTRUCTOR
595 ****************************************************************/
596CompressedFitsFile::CompressedFitsFile(uint32_t numTiles,
597 uint32_t numRowsPerTile) : _header(),
598 _columns(),
599 _numTiles(numTiles),
600 _numRowsPerTile(numRowsPerTile),
601 _totalNumRows(0),
602 _rowWidth(0),
603 _headerFlushed(false),
604 _buffer(NULL),
605 _checksum(0),
606 _heapPtr(0),
607 _transposedBuffer(1),
608 _compressedBuffer(1),
609 _numThreads(1),
610 _threadIndex(0),
611 _thread(1),
612 _threadNumRows(1),
613 _threadStatus(1)
614{
615 _catalog.resize(_numTiles);
616 _transposedBuffer[0] = NULL;
617 _compressedBuffer[0] = NULL;
618 _threadStatus[0] = _THREAD_WAIT_;
619 _threadNumRows[0] = 0;
620}
621
622/****************************************************************
623 * SUPER CLASS DEFAULT DESTRUCTOR
624 ****************************************************************/
625CompressedFitsFile::~CompressedFitsFile()
626{
627 if (_buffer != NULL)
628 {
629 delete[] _buffer;
630 _buffer = NULL;
631 for (uint32_t i=0;i<_numThreads;i++)
632 {
633 _compressedBuffer[i] = _compressedBuffer[i]-4;
634 delete[] _transposedBuffer[i];
635 delete[] _compressedBuffer[i];
636 _transposedBuffer[i] = NULL;
637 _compressedBuffer[i] = NULL;
638 }
639 }
640 if (_file.is_open())
641 _file.close();
642}
643
644/****************************************************************
645 * REALLOCATE BUFFER
646 ****************************************************************/
647bool CompressedFitsFile::reallocateBuffers()
648{
649 if (_buffer)
650 {
651 delete[] _buffer;
652 for (uint32_t i=0;i<_compressedBuffer.size();i++)
653 {
654 _compressedBuffer[i] = _compressedBuffer[i]-4;
655 delete[] _transposedBuffer[i];
656 delete[] _compressedBuffer[i];
657 }
658 }
659 _buffer = new char[_rowWidth*_numRowsPerTile];
660 if (_buffer == NULL) return false;
661 if (_compressedBuffer.size() != _numThreads)
662 {
663 _transposedBuffer.resize(_numThreads);
664 _compressedBuffer.resize(_numThreads);
665 }
666 for (uint32_t i=0;i<_numThreads;i++)
667 {
668 _transposedBuffer[i] = new char[_rowWidth*_numRowsPerTile];
669 _compressedBuffer[i] = new char[_rowWidth*_numRowsPerTile + _columns.size() + sizeof(TileHeader)]; //use a bit more memory for compression flags
670 if (_transposedBuffer[i] == NULL || _compressedBuffer[i] == NULL)
671 return false;
672 //shift the compressed buffer by 4 bytes, for checksum calculation
673 memset(_compressedBuffer[i], 0, 4);
674 _compressedBuffer[i] = _compressedBuffer[i]+4;
675 //initialize the tile header
676 TileHeader tileHeader;
677 memcpy(_compressedBuffer[i], &tileHeader, sizeof(TileHeader));
678 }
679 return true;
680}
681
682/****************************************************************
683 * DEFAULT WRITER CONSTRUCTOR
684 ****************************************************************/
685CompressedFitsWriter::CompressedFitsWriter(uint32_t numTiles,
686 uint32_t numRowsPerTile) : CompressedFitsFile(numTiles, numRowsPerTile),
687 _checkOffset(0),
688 _drsCalibData(NULL),
689 _threadLooper(0)
690{
691 _defaultHeader.push_back(HeaderEntry("XTENSION", "'BINTABLE' ", "binary table extension"));
692 _defaultHeader.push_back(HeaderEntry("BITPIX", 8, "8-bit bytes"));
693 _defaultHeader.push_back(HeaderEntry("NAXIS", 2, "2-dimensional binary table"));
694 _defaultHeader.push_back(HeaderEntry("NAXIS1", _rowWidth, "width of table in bytes"));
695 _defaultHeader.push_back(HeaderEntry("NAXIS2", numTiles, "num of rows in table"));
696 _defaultHeader.push_back(HeaderEntry("PCOUNT", 0, "size of special data area"));
697 _defaultHeader.push_back(HeaderEntry("GCOUNT", 1, "one data group (required keyword)"));
698 _defaultHeader.push_back(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
699 _defaultHeader.push_back(HeaderEntry("CHECKSUM", "'0000000000000000' ", "Checksum for the whole HDU"));
700 _defaultHeader.push_back(HeaderEntry("DATASUM", " 0", "Checksum for the data block"));
701 //compression stuff
702 _defaultHeader.push_back(HeaderEntry("ZTABLE", "T", "Table is compressed"));
703 _defaultHeader.push_back(HeaderEntry("ZNAXIS1", 0, "Width of uncompressed rows"));
704 _defaultHeader.push_back(HeaderEntry("ZNAXIS2", 0, "Number of uncompressed rows"));
705 _defaultHeader.push_back(HeaderEntry("ZPCOUNT", 0, ""));
706 _defaultHeader.push_back(HeaderEntry("ZHEAPPTR", 0, ""));
707 _defaultHeader.push_back(HeaderEntry("ZTILELEN", numRowsPerTile, "Number of rows per tile"));
708 _defaultHeader.push_back(HeaderEntry("THEAP", 0, ""));
709
710 pthread_mutex_init(&_mutex, NULL);
711}
712
713/****************************************************************
714 * DEFAULT DESTRUCTOR
715 ****************************************************************/
716CompressedFitsWriter::~CompressedFitsWriter()
717{
718 pthread_mutex_destroy(&_mutex);
719}
720
721/****************************************************************
722 * SET THE POINTER TO THE DRS CALIBRATION
723 ****************************************************************/
724void CompressedFitsWriter::setDrsCalib(int16_t* data)
725{
726 _drsCalibData = data;
727}
728
729/****************************************************************
730 * SET NUM WORKING THREADS
731 ****************************************************************/
732bool CompressedFitsWriter::setNumWorkingThreads(uint32_t num)
733{
734 if (_file.is_open())
735 return false;
736 if (num < 1 || num > 64)
737 {
738 cout << "ERROR: num threads must be between 1 and 64. Ignoring" << endl;
739 return false;
740 }
741 _numThreads = num;
742 _transposedBuffer[0] = NULL;
743 _compressedBuffer[0] = NULL;
744 _threadStatus.resize(num);
745 _thread.resize(num);
746 _threadNumRows.resize(num);
747 for (uint32_t i=0;i<num;i++)
748 {
749 _threadNumRows[i] = 0;
750 _threadStatus[i] = _THREAD_WAIT_;
751 }
752 return reallocateBuffers();
753}
754
755/****************************************************************
756 * WRITE DRS CALIBRATION TO FILE
757 ****************************************************************/
758void CompressedFitsWriter::writeDrsCalib()
759{
760 //if file was not loaded, ignore
761 if (_drsCalibData == NULL)
762 return;
763 uint64_t whereDidIStart = _file.tellp();
764 vector<HeaderEntry> header;
765 header.push_back(HeaderEntry("XTENSION", "'BINTABLE' ", "binary table extension"));
766 header.push_back(HeaderEntry("BITPIX" , 8 , "8-bit bytes"));
767 header.push_back(HeaderEntry("NAXIS" , 2 , "2-dimensional binary table"));
768 header.push_back(HeaderEntry("NAXIS1" , 1024*1440*2 , "width of table in bytes"));
769 header.push_back(HeaderEntry("NAXIS2" , 1 , "number of rows in table"));
770 header.push_back(HeaderEntry("PCOUNT" , 0 , "size of special data area"));
771 header.push_back(HeaderEntry("GCOUNT" , 1 , "one data group (required keyword)"));
772 header.push_back(HeaderEntry("TFIELDS" , 1 , "number of fields in each row"));
773 header.push_back(HeaderEntry("CHECKSUM", "'0000000000000000' ", "Checksum for the whole HDU"));
774 header.push_back(HeaderEntry("DATASUM" , " 0" , "Checksum for the data block"));
775 header.push_back(HeaderEntry("EXTNAME" , "'ZDrsCellOffsets' ", "name of this binary table extension"));
776 header.push_back(HeaderEntry("TTYPE1" , "'OffsetCalibration' ", "label for field 1"));
777 header.push_back(HeaderEntry("TFORM1" , "'1474560I' ", "data format of field: 2-byte INTEGER"));
778
779 for (uint32_t i=0;i<header.size();i++)
780 _file.write(header[i].fitsString().c_str(), 80);
781 //End the header
782 _file.write("END ", 80);
783 long here = _file.tellp();
784 if (here%2880)
785 _file.write(_emptyBlock.c_str(), 2880 - here%2880);
786 //now write the data itself
787 int16_t* swappedBytes = new int16_t[1024];
788 Checksum checksum;
789 for (int32_t i=0;i<1440;i++)
790 {
791 memcpy(swappedBytes, &(_drsCalibData[i*1024]), 2048);
792 for (int32_t j=0;j<2048;j+=2)
793 {
794 int8_t inter;
795 inter = reinterpret_cast<int8_t*>(swappedBytes)[j];
796 reinterpret_cast<int8_t*>(swappedBytes)[j] = reinterpret_cast<int8_t*>(swappedBytes)[j+1];
797 reinterpret_cast<int8_t*>(swappedBytes)[j+1] = inter;
798 }
799 _file.write(reinterpret_cast<char*>(swappedBytes), 2048);
800 checksum.add(reinterpret_cast<char*>(swappedBytes), 2048);
801 }
802 uint64_t whereDidIStop = _file.tellp();
803 delete[] swappedBytes;
804 //No need to pad the data, as (1440*1024*2)%2880==0
805
806 //calculate the checksum from the header
807 ostringstream str;
808 str << checksum.val();
809 header[9] = HeaderEntry("DATASUM", str.str(), "Checksum for the data block");
810 for (vector<HeaderEntry>::iterator it=header.begin();it!=header.end(); it++)
811 checksum.add(it->fitsString().c_str(), 80);
812 string end("END ");
813 string space(" ");
814 checksum.add(end.c_str(), 80);
815 int headerRowsLeft = 36 - (header.size() + 1)%36;
816 for (int i=0;i<headerRowsLeft;i++)
817 checksum.add(space.c_str(), 80);
818 //udpate the checksum keyword
819 header[8] = HeaderEntry("CHECKSUM", checksum.str(), "Checksum for the whole HDU");
820 //and eventually re-write the header data
821 _file.seekp(whereDidIStart);
822 for (uint32_t i=0;i<header.size();i++)
823 _file.write(header[i].fitsString().c_str(), 80);
824 _file.seekp(whereDidIStop);
825}
826
827/****************************************************************
828 * ADD COLUMN
829 ****************************************************************/
830bool CompressedFitsWriter::addColumn(const ColumnEntry& column)
831{
832 if (_totalNumRows != 0)
833 {
834 cout << "Error: cannot add new columns once first row has been written" << endl;
835 return false;
836 }
837 for (vector<ColumnEntry>::iterator it=_columns.begin(); it != _columns.end(); it++)
838 {
839 if (it->name() == column.name())
840 {
841 cout << "Warning: column already exist (" << column.name() << "). Ignoring" << endl;
842 return false;
843 }
844 }
845 _columns.push_back(column);
846 _columns.back().setOffset(_rowWidth);
847 _rowWidth += column.width();
848 reallocateBuffers();
849
850 ostringstream str, str2, str3;
851 str << "TTYPE" << _columns.size();
852 str2 << column.name();
853 str3 << "label for field ";
854 if (_columns.size() < 10) str3 << " ";
855 if (_columns.size() < 100) str3 << " ";
856 str3 << _columns.size();
857 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
858 str.str("");
859 str2.str("");
860 str3.str("");
861 str << "TFORM" << _columns.size();
862 str2 << "1QB";
863 str3 << "data format of field " << _columns.size();
864 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
865 str.str("");
866 str2.str("");
867 str3.str("");
868 str << "ZFORM" << _columns.size();
869 str2 << column.numElems() << column.type();
870 str3 << "Original format of field " << _columns.size();
871 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
872 str.str("");
873 str2.str("");
874 str3.str("");
875 str << "ZCTYP" << _columns.size();
876 str2 << column.getCompressionString();
877 str3 << "Comp. Scheme of field " << _columns.size();
878 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
879 //resize the catalog vector accordingly
880 for (uint32_t i=0;i<_numTiles;i++)
881 {
882 _catalog[i].resize(_columns.size());
883 for (uint32_t j=0;j<_catalog[i].size();j++)
884 _catalog[i][j] = make_pair(0,0);
885 }
886 return true;
887}
888
889/****************************************************************
890 * SET HEADER KEY
891 ****************************************************************/
892bool CompressedFitsWriter::setHeaderKey(const HeaderEntry& entry)
893{
894 HeaderEntry ent = entry;
895 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
896 {
897 if (it->key() == entry.key())
898 {
899 if (entry.comment() == "")
900 ent.setComment(it->comment());
901 (*it) = ent;
902 _headerFlushed = false;
903 return true;
904 }
905 }
906 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
907 {
908 if (it->key() == entry.key())
909 {
910 if (entry.comment() == "")
911 ent.setComment(it->comment());
912 (*it) = ent;
913 _headerFlushed = false;
914 return true;
915 }
916 }
917 if (_totalNumRows != 0)
918 {
919 cout << "Error: new header keys (" << entry.key() << ") must be set before the first row is written. Ignoring." << endl;
920 return false;
921 }
922 _header.push_back(entry);
923 _headerFlushed = false;
924 return true;
925}
926
927/****************************************************************
928 * OPEN
929 ****************************************************************/
930bool CompressedFitsWriter::open(const string& fileName, const string& tableName)
931{
932 _file.open(fileName.c_str(), ios_base::out);
933 if (!_file.is_open())
934 {
935 cout << "Error: Could not open the file (" << fileName << ")." << endl;
936 return false;
937 }
938 _defaultHeader.push_back(HeaderEntry("EXTNAME", tableName, "name of this binary table extension"));
939 _headerFlushed = false;
940 _threadIndex = 0;
941 //create requested number of threads
942 for (uint32_t i=0;i<_numThreads;i++)
943 pthread_create(&(_thread[i]), NULL, threadFunction, this);
944 //wait for the threads to start
945 while (_numThreads != _threadIndex)
946 usleep(1000);
947 //set the writing fence to the last thread
948 _threadIndex = _numThreads-1;
949 return (_file.good());
950}
951
952/****************************************************************
953 * WRITE HEADER
954 ****************************************************************/
955void CompressedFitsWriter::writeHeader(bool closingFile)
956{
957 if (_headerFlushed)
958 return;
959 if (!_file.is_open())
960 return;
961
962 long cPos = _file.tellp();
963
964 _file.seekp(0);
965
966 _file.write(_fitsHeader.c_str(), 2880);
967
968 //Write the DRS calib table here !
969 writeDrsCalib();
970
971 //we are now at the beginning of the main table. Write its header
972 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
973 _file.write(it->fitsString().c_str(), 80);
974
975 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
976 _file.write(it->fitsString().c_str(), 80);
977
978 _file.write("END ", 80);
979 long here = _file.tellp();
980 if (here%2880)
981 _file.write(_emptyBlock.c_str(), 2880 - here%2880);
982
983 _headerFlushed = true;
984
985 here = _file.tellp();
986
987 if (here%2880)
988 cout << "Error: seems that header did not finish at the end of a block." << endl;
989
990 if (here > cPos && cPos != 0)
991 {
992 cout << "Error, entries were added after the first row was written. This is not supposed to happen." << endl;
993 return;
994 }
995
996 here = _file.tellp();
997 writeCatalog(closingFile);
998
999 here = _file.tellp() - here;
1000 _heapPtr = here;
1001
1002 if (cPos != 0)
1003 _file.seekp(cPos);
1004}
1005
1006/****************************************************************
1007 * WRITE CATALOG
1008 * WARNING: writeCatalog is only meant to be used by writeHeader.
1009 * external usage will most likely corrupt the file
1010 ****************************************************************/
1011void CompressedFitsWriter::writeCatalog(bool closingFile)
1012{
1013 uint32_t sizeWritten = 0;
1014 for (uint32_t i=0;i<_catalog.size();i++)
1015 {
1016 for (uint32_t j=0;j<_catalog[i].size();j++)
1017 {
1018 //swap the bytes
1019 int8_t swappedEntry[16];
1020 swappedEntry[0] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[7];
1021 swappedEntry[1] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[6];
1022 swappedEntry[2] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[5];
1023 swappedEntry[3] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[4];
1024 swappedEntry[4] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[3];
1025 swappedEntry[5] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[2];
1026 swappedEntry[6] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[1];
1027 swappedEntry[7] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[0];
1028
1029 swappedEntry[8] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[7];
1030 swappedEntry[9] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[6];
1031 swappedEntry[10] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[5];
1032 swappedEntry[11] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[4];
1033 swappedEntry[12] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[3];
1034 swappedEntry[13] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[2];
1035 swappedEntry[14] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[1];
1036 swappedEntry[15] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[0];
1037 if (closingFile)
1038 {
1039 _checksum.add(reinterpret_cast<char*>(swappedEntry), 16);
1040 }
1041 _file.write(reinterpret_cast<char*>(&swappedEntry[0]), 2*sizeof(int64_t));
1042 sizeWritten += 2*sizeof(int64_t);
1043 }
1044 }
1045
1046 //we do not reserve space for now because fverify does not like that.
1047 //TODO bug should be fixed in the new version. Install it on the cluster and restor space reservation
1048 return ;
1049
1050 //write the padding so that the HEAP section starts at a 2880 bytes boundary
1051 if (sizeWritten % 2880 != 0)
1052 {
1053 vector<char> nullVec(2880 - sizeWritten%2880, 0);
1054 _file.write(nullVec.data(), 2880 - sizeWritten%2880);
1055 }
1056}
1057
1058/****************************************************************
1059 * ADD HEADER CHECKSUM
1060 ****************************************************************/
1061void CompressedFitsWriter::addHeaderChecksum(Checksum& checksum)
1062{
1063 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin();it!=_defaultHeader.end(); it++)
1064 _checksum.add(it->fitsString().c_str(), 80);
1065 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
1066 _checksum.add(it->fitsString().c_str(), 80);
1067 string end("END ");
1068 string space(" ");
1069 checksum.add(end.c_str(), 80);
1070 int headerRowsLeft = 36 - (_defaultHeader.size() + _header.size() + 1)%36;
1071 for (int i=0;i<headerRowsLeft;i++)
1072 checksum.add(space.c_str(), 80);
1073}
1074
1075/****************************************************************
1076 * CLOSE
1077 ****************************************************************/
1078bool CompressedFitsWriter::close()
1079{
1080 for (uint32_t i=0;i<_numThreads;i++)
1081 while (_threadStatus[i] != _THREAD_WAIT_)
1082 usleep(100000);
1083 for (uint32_t i=0;i<_numThreads;i++)
1084 _threadStatus[i] = _THREAD_EXIT_;
1085 for (uint32_t i=0;i<_numThreads;i++)
1086 pthread_join(_thread[i], NULL);
1087 //flush the rows that were not written yet
1088 if (_totalNumRows%_numRowsPerTile != 0)
1089 {
1090 copyTransposeTile(0);
1091
1092 _threadNumRows[0] = _totalNumRows;
1093 uint32_t numBytes = compressBuffer(0);
1094 writeCompressedDataToDisk(0, numBytes);
1095 }
1096 //compression stuff
1097 setHeaderKey(HeaderEntry("ZNAXIS1", _rowWidth, "Width of uncompressed rows"));
1098 setHeaderKey(HeaderEntry("ZNAXIS2", _totalNumRows, "Number of uncompressed rows"));
1099 //TODO calculate the real offset from the main table to the start of the HEAP data area
1100 setHeaderKey(HeaderEntry("ZHEAPPTR", _heapPtr, ""));
1101 setHeaderKey(HeaderEntry("THEAP", _heapPtr, ""));
1102
1103 //regular stuff
1104 if (_catalog.size() > 0)
1105 {
1106 setHeaderKey(HeaderEntry("NAXIS1", 2*sizeof(int64_t)*_catalog[0].size(), "width of table in bytes"));
1107 setHeaderKey(HeaderEntry("NAXIS2", _numTiles, ""));
1108 setHeaderKey(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
1109 int64_t heapSize = 0;
1110 int64_t compressedOffset = 0;
1111 for (uint32_t i=0;i<_catalog.size();i++)
1112 {
1113 compressedOffset += sizeof(TileHeader);
1114 heapSize += sizeof(TileHeader);
1115 for (uint32_t j=0;j<_catalog[i].size();j++)
1116 {
1117 heapSize += _catalog[i][j].first;
1118 //set the catalog offsets to their actual values
1119 _catalog[i][j].second = compressedOffset;
1120 compressedOffset += _catalog[i][j].first;
1121 //special case if entry has zero length
1122 if (_catalog[i][j].first == 0) _catalog[i][j].second = 0;
1123 }
1124 }
1125 setHeaderKey(HeaderEntry("PCOUNT", heapSize, "size of special data area"));
1126 }
1127 else
1128 {
1129 setHeaderKey(HeaderEntry("NAXIS1", _rowWidth, "width of table in bytes"));
1130 setHeaderKey(HeaderEntry("NAXIS2", 0, ""));
1131 setHeaderKey(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
1132 setHeaderKey(HeaderEntry("PCOUNT", 0, "size of special data area"));
1133 }
1134 writeHeader(true);
1135 ostringstream str;
1136 str << _checksum.val();
1137
1138 setHeaderKey(HeaderEntry("DATASUM", str.str(), ""));
1139 addHeaderChecksum(_checksum);
1140 setHeaderKey(HeaderEntry("CHECKSUM", _checksum.str(), ""));
1141 //update header value
1142 writeHeader();
1143 //update file length
1144 long here = _file.tellp();
1145 if (here%2880)
1146 {
1147 vector<char> nullVec(2880 - here%2880, 0);
1148 _file.write(nullVec.data(), 2880 - here%2880);
1149 }
1150 _file.close();
1151 return true;
1152}
1153
1154/****************************************************************
1155 * COPY TRANSPOSE TILE
1156 ****************************************************************/
1157void CompressedFitsWriter::copyTransposeTile(uint32_t index)
1158{
1159 uint32_t thisRoundNumRows = (_totalNumRows%_numRowsPerTile) ? _totalNumRows%_numRowsPerTile : _numRowsPerTile;
1160 //copy the tile and transpose it
1161 uint32_t offset = 0;
1162 for (uint32_t i=0;i<_columns.size();i++)
1163 {
1164 switch (_columns[i].getColumnOrdering())//getCompression())
1165 {
1166 case FACT_ROW_MAJOR:
1167 for (uint32_t k=0;k<thisRoundNumRows;k++)
1168 {//regular, "semi-transposed" copy
1169 memcpy(&(_transposedBuffer[index][offset]), &_buffer[k*_rowWidth + _columns[i].offset()], _columns[i].sizeOfElems()*_columns[i].numElems());
1170 offset += _columns[i].sizeOfElems()*_columns[i].numElems();
1171 }
1172 break;
1173
1174 case FACT_COL_MAJOR :
1175 for (int j=0;j<_columns[i].numElems();j++)
1176 for (uint32_t k=0;k<thisRoundNumRows;k++)
1177 {//transposed copy
1178 memcpy(&(_transposedBuffer[index][offset]), &_buffer[k*_rowWidth + _columns[i].offset() + _columns[i].sizeOfElems()*j], _columns[i].sizeOfElems());
1179 offset += _columns[i].sizeOfElems();
1180 }
1181 break;
1182 default:
1183 cout << "Error: unknown column ordering: " << _columns[i].getColumnOrdering() << endl;
1184
1185 };
1186 }
1187}
1188
1189/****************************************************************
1190 * WRITE BINARY ROW
1191 ****************************************************************/
1192bool CompressedFitsWriter::writeBinaryRow(const char* bufferToWrite)
1193{
1194 if (_totalNumRows == 0)
1195 writeHeader();
1196
1197 memcpy(&_buffer[_rowWidth*(_totalNumRows%_numRowsPerTile)], bufferToWrite, _rowWidth);
1198 _totalNumRows++;
1199 if (_totalNumRows%_numRowsPerTile == 0)
1200 {
1201 //which is the next thread that we should use ?
1202 while (_threadStatus[_threadLooper] == _THREAD_COMPRESS_)
1203 usleep(100000);
1204
1205 copyTransposeTile(_threadLooper);
1206
1207 while (_threadStatus[_threadLooper] != _THREAD_WAIT_)
1208 usleep(100000);
1209
1210 _threadNumRows[_threadLooper] = _totalNumRows;
1211 _threadStatus[_threadLooper] = _THREAD_COMPRESS_;
1212 _threadLooper = (_threadLooper+1)%_numThreads;
1213 }
1214 return _file.good();
1215}
1216
1217/****************************************************************
1218 * COMPRESS BUFFER
1219 ****************************************************************/
1220uint32_t CompressedFitsWriter::compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1221{
1222 memcpy(dest, src, numRows*sizeOfElems*numRowElems);
1223 return numRows*sizeOfElems*numRowElems;
1224}
1225
1226/****************************************************************
1227 * COMPRESS BUFFER
1228 ****************************************************************/
1229uint32_t CompressedFitsWriter::compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1230{
1231 string huffmanOutput;
1232 uint32_t previousHuffmanSize = 0;
1233 if (numRows < 2)
1234 {//if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
1235 return numRows*sizeOfElems*numRowElems + 1000;
1236 }
1237 if (sizeOfElems < 2 )
1238 {
1239 cout << "Fatal ERROR: HUFMANN can only encode short or longer types" << endl;
1240 return 0;
1241 }
1242 uint32_t huffmanOffset = 0;
1243 for (uint32_t j=0;j<numRowElems;j++)
1244 {
1245 Huffman::Encode(huffmanOutput,
1246 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
1247 numRows*(sizeOfElems/2));
1248 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
1249 huffmanOffset += sizeof(uint32_t);
1250 previousHuffmanSize = huffmanOutput.size();
1251 }
1252 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
1253
1254 //only copy if not larger than not-compressed size
1255 if (totalSize < numRows*sizeOfElems*numRowElems)
1256 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
1257
1258 return totalSize;
1259}
1260
1261/****************************************************************
1262 * COMPRESS BUFFER
1263 ****************************************************************/
1264uint32_t CompressedFitsWriter::compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1265{
1266 uint32_t colWidth = numRowElems;
1267 for (int j=colWidth*numRows-1;j>1;j--)
1268 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
1269 //call the huffman transposed
1270 return compressHUFFMAN(dest, src, numRowElems, sizeOfElems, numRows);
1271}
1272
1273uint32_t CompressedFitsWriter::applySMOOTHING(char* , char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1274{
1275 uint32_t colWidth = numRowElems;
1276 for (int j=colWidth*numRows-1;j>1;j--)
1277 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
1278
1279 return numRows*sizeOfElems*numRowElems;
1280}
1281/****************************************************************
1282 * COMPRESS BUFFER
1283 ****************************************************************/
1284uint64_t CompressedFitsWriter::compressBuffer(uint32_t threadIndex)
1285{
1286 uint32_t thisRoundNumRows = (_threadNumRows[threadIndex]%_numRowsPerTile) ? _threadNumRows[threadIndex]%_numRowsPerTile : _numRowsPerTile;
1287 uint32_t offset=0;
1288 uint32_t currentCatalogRow = (_threadNumRows[threadIndex]-1)/_numRowsPerTile;
1289 uint64_t compressedOffset = sizeof(TileHeader); //skip the 'TILE' marker and tile size entry
1290
1291 //now compress each column one by one by calling compression on arrays
1292 for (uint32_t i=0;i<_columns.size();i++)
1293 {
1294 _catalog[currentCatalogRow][i].second = compressedOffset;
1295
1296 if (_columns[i].numElems() == 0) continue;
1297
1298 BlockHeader& head = _columns[i].getBlockHeader();
1299 const vector<uint16_t>& sequence = _columns[i].getCompressionSequence();
1300 //set the default byte telling if uncompressed the compressed Flag
1301 uint64_t previousOffset = compressedOffset;
1302 //skip header data
1303 compressedOffset += sizeof(BlockHeader) + sizeof(uint16_t)*sequence.size();
1304
1305 for (uint32_t j=0;j<sequence.size(); j++)
1306 {
1307 switch (sequence[j])
1308 {
1309 case FACT_RAW:
1310// if (head.numProcs == 1)
1311 compressedOffset += compressUNCOMPRESSED(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1312 break;
1313 case FACT_SMOOTHING:
1314 applySMOOTHING(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1315 break;
1316 case FACT_HUFFMAN16:
1317 if (head.ordering == FACT_COL_MAJOR)
1318 compressedOffset += compressHUFFMAN(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1319 else
1320 compressedOffset += compressHUFFMAN(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), _columns[i].numElems(), _columns[i].sizeOfElems(), thisRoundNumRows);
1321 break;
1322 default:
1323 cout << "ERROR: Unkown compression sequence entry: " << sequence[i] << endl;
1324 break;
1325 }
1326 }
1327
1328 //check if compressed size is larger than uncompressed
1329 if (sequence[0] != FACT_RAW &&
1330 compressedOffset - previousOffset > _columns[i].sizeOfElems()*_columns[i].numElems()*thisRoundNumRows+sizeof(BlockHeader)+sizeof(uint16_t)*sequence.size())
1331 {//if so set flag and redo it uncompressed
1332 cout << "REDOING UNCOMPRESSED" << endl;
1333 compressedOffset = previousOffset + sizeof(BlockHeader) + 1;
1334 compressedOffset += compressUNCOMPRESSED(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1335 BlockHeader he;
1336 he.size = compressedOffset - previousOffset;
1337 he.numProcs = 1;
1338 he.ordering = FACT_ROW_MAJOR;
1339 memcpy(&(_compressedBuffer[threadIndex][previousOffset]), (char*)(&he), sizeof(BlockHeader));
1340 _compressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)] = FACT_RAW;
1341 offset += thisRoundNumRows*_columns[i].sizeOfElems()*_columns[i].numElems();
1342 _catalog[currentCatalogRow][i].first = compressedOffset - _catalog[currentCatalogRow][i].second;
1343 continue;
1344 }
1345 head.size = compressedOffset - previousOffset;
1346 memcpy(&(_compressedBuffer[threadIndex][previousOffset]), (char*)(&head), sizeof(BlockHeader));
1347 memcpy(&(_compressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)]), sequence.data(), sizeof(uint16_t)*sequence.size());
1348
1349 offset += thisRoundNumRows*_columns[i].sizeOfElems()*_columns[i].numElems();
1350 _catalog[currentCatalogRow][i].first = compressedOffset - _catalog[currentCatalogRow][i].second;
1351 }
1352
1353 TileHeader tHead(thisRoundNumRows, compressedOffset);
1354 memcpy(_compressedBuffer[threadIndex], &tHead, sizeof(TileHeader));
1355 return compressedOffset;
1356}
1357
1358/****************************************************************
1359 * WRITE COMPRESS DATA TO DISK
1360 ****************************************************************/
1361bool CompressedFitsWriter::writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite)
1362{
1363 char* checkSumPointer = _compressedBuffer[threadID];
1364 int32_t extraBytes = 0;
1365 uint32_t sizeToChecksum = sizeToWrite;
1366 if (_checkOffset != 0)
1367 {//should we extend the array to the left ?
1368 sizeToChecksum += _checkOffset;
1369 checkSumPointer -= _checkOffset;
1370 memset(checkSumPointer, 0, _checkOffset);
1371 }
1372 if (sizeToChecksum%4 != 0)
1373 {//should we extend the array to the right ?
1374 extraBytes = 4 - (sizeToChecksum%4);
1375 memset(checkSumPointer+sizeToChecksum, 0,extraBytes);
1376 sizeToChecksum += extraBytes;
1377 }
1378 //do the checksum
1379 _checksum.add(checkSumPointer, sizeToChecksum);
1380 _checkOffset = (4 - extraBytes)%4;
1381 //write data to disk
1382 _file.write(_compressedBuffer[threadID], sizeToWrite);
1383 return _file.good();
1384}
1385
1386/****************************************************************
1387 * WRITER THREAD LOOP
1388 ****************************************************************/
1389void* CompressedFitsWriter::threadFunction(void* context)
1390{
1391 CompressedFitsWriter* myself =static_cast<CompressedFitsWriter*>(context);
1392
1393 uint32_t myID = 0;
1394 pthread_mutex_lock(&(myself->_mutex));
1395 myID = myself->_threadIndex++;
1396 pthread_mutex_unlock(&(myself->_mutex));
1397 uint32_t threadToWaitForBeforeWriting = (myID == 0) ? myself->_numThreads-1 : myID-1;
1398
1399 while (myself->_threadStatus[myID] != _THREAD_EXIT_)
1400 {
1401 while (myself->_threadStatus[myID] == _THREAD_WAIT_)
1402 usleep(100000);
1403 if (myself->_threadStatus[myID] != _THREAD_COMPRESS_)
1404 continue;
1405 uint32_t numBytes = myself->compressBuffer(myID);
1406 myself->_threadStatus[myID] = _THREAD_WRITE_;
1407
1408 //wait for the previous data to be written
1409 while (myself->_threadIndex != threadToWaitForBeforeWriting)
1410 usleep(1000);
1411 //do the actual writing to disk
1412 pthread_mutex_lock(&(myself->_mutex));
1413 myself->writeCompressedDataToDisk(myID, numBytes);
1414 myself->_threadIndex = myID;
1415 pthread_mutex_unlock(&(myself->_mutex));
1416 myself->_threadStatus[myID] = _THREAD_WAIT_;
1417 }
1418 return NULL;
1419}
1420
1421/****************************************************************
1422 * PRINT USAGE
1423 ****************************************************************/
1424void printUsage()
1425{
1426 cout << endl;
1427 cout << "The FACT-Fits compressor reads an input Fits file from FACT"
1428 " and compresses it.\n It can use a drs calibration in order to"
1429 " improve the compression ratio. If so, the input calibration"
1430 " is embedded into the compressed file.\n"
1431 " By default, the Data column will be compressed using SMOOTHMAN (Thomas' algorithm)"
1432 " while other columns will be compressed with the AMPLITUDE coding (Veritas)"
1433 "Usage: Compressed_Fits_Test <inputFile>";
1434 cout << endl;
1435}
1436
1437/****************************************************************
1438 * PRINT HELP
1439 ****************************************************************/
1440void printHelp()
1441{
1442 cout << endl;
1443 cout << "The inputFile is required. It must have fits in its filename and the compressed file will be written in the same folder. "
1444 "The fz extension will be added, replacing the .gz one if required \n"
1445 "If output is specified, then it will replace the automatically generated output filename\n"
1446 "If --drs, followed by a drs calib then it will be applied to the data before compressing\n"
1447 "rowPerTile can be used to tune how many rows are in each tile. Default is 100\n"
1448 "threads gives the number of threads to use. Cannot be less than the default (1)\n"
1449 "compression explicitely gives the compression scheme to use for a given column. The syntax is:\n"
1450 "<ColumnName>=<CompressionScheme> with <CompressionScheme> one of the following:\n"
1451 "UNCOMPRESSED\n"
1452 "AMPLITUDE\n"
1453 "HUFFMAN\n"
1454 "SMOOTHMAN\n"
1455 "INT_WAVELET\n"
1456 "\n"
1457 "--quiet removes any textual output, except error messages\n"
1458 "--verify makes the compressor check the compressed data. It will read it back, and compare the reconstructed CHECKSUM and DATASUM with the original file values."
1459 ;
1460 cout << endl << endl;
1461}
1462
1463/****************************************************************
1464 * SETUP CONFIGURATION
1465 ****************************************************************/
1466void setupConfiguration(Configuration& conf)
1467{
1468 po::options_description configs("FitsCompressor options");
1469 configs.add_options()
1470 ("inputFile,i", vars<string>(), "Input file")
1471 ("drs,d", var<string>(), "Input drs calibration file")
1472 ("rowPerTile,r", var<unsigned int>(), "Number of rows per tile. Default is 100")
1473 ("output,o", var<string>(), "Output file. If empty, .fz is appened to the original name")
1474 ("threads,t", var<unsigned int>(), "Number of threads to use for compression")
1475 ("compression,c", vars<string>(), "which compression to use for which column. Syntax <colName>=<compressionScheme>")
1476 ("quiet,q", po_switch(), "Should the program display any text at all ?")
1477 ("verify,v", po_switch(), "Should we verify the data that has been compressed ?")
1478 ;
1479 po::positional_options_description positional;
1480 positional.add("inputFile", -1);
1481 conf.AddOptions(configs);
1482 conf.SetArgumentPositions(positional);
1483}
1484
1485/****************************************************************
1486 * MAIN
1487 ****************************************************************/
1488int main(int argc, const char** argv)
1489{
1490 Configuration conf(argv[0]);
1491 conf.SetPrintUsage(printUsage);
1492 setupConfiguration(conf);
1493
1494 if (!conf.DoParse(argc, argv, printHelp))
1495 return -1;
1496
1497 //initialize the file names to nothing.
1498 string fileNameIn = "";
1499 string fileNameOut = "";
1500 string drsFileName = "";
1501 uint32_t numRowsPerTile = 100;
1502 bool displayText=true;
1503
1504 //parse configuration
1505 if (conf.Get<bool>("quiet")) displayText = false;
1506 const vector<string> inputFileNameVec = conf.Vec<string>("inputFile");
1507 if (inputFileNameVec.size() != 1)
1508 {
1509 cout << "Error: ";
1510 if (inputFileNameVec.size() == 0) cout << "no";
1511 else cout << inputFileNameVec.size();
1512 cout << " input file(s) given. Expected one. Aborting. Input:" << endl;;
1513 for (unsigned int i=0;i<inputFileNameVec.size(); i++)
1514 cout << inputFileNameVec[i] << endl;
1515 return -1;
1516 }
1517
1518 //Assign the input filename
1519 fileNameIn = inputFileNameVec[0];
1520
1521 //Check if we have a drs calib too
1522 if (conf.Has("drs")) drsFileName = conf.Get<string>("drs");
1523
1524 //Should we verify the data ?
1525 bool verifyDataPlease = false;
1526 if (conf.Has("verify")) verifyDataPlease = conf.Get<bool>("verify");
1527
1528
1529 //should we use a specific output filename ?
1530 if (conf.Has("output"))
1531 fileNameOut = conf.Get<string>("output");
1532 else
1533 {
1534 size_t pos = fileNameIn.find(".fits");
1535 if (pos == string::npos)
1536 {
1537 cout << "ERROR: input file does not seems ot be fits. Aborting." << endl;
1538 return -1;
1539 }
1540 fileNameOut = fileNameIn.substr(0, pos) + ".fz";
1541 }
1542
1543 //should we use specific compression on some columns ?
1544 const vector<string> columnsCompression = conf.Vec<string>("compression");
1545
1546 //split up values between column names and compression scheme
1547 vector<std::pair<string, string>> compressions;
1548 for (unsigned int i=0;i<columnsCompression.size();i++)
1549 {
1550 size_t pos = columnsCompression[i].find_first_of("=");
1551 if (pos == string::npos)
1552 {
1553 cout << "ERROR: Something wrong occured while parsing " << columnsCompression[i] << ". Aborting." << endl;
1554 return -1;
1555 }
1556 string comp = columnsCompression[i].substr(pos+1);
1557 if (comp != "UNCOMPRESSED" && comp != "AMPLITUDE" && comp != "HUFFMAN" &&
1558 comp != "SMOOTHMAN" && comp != "INT_WAVELET")
1559 {
1560 cout << "Unkown compression scheme requested (" << comp << "). Aborting." << endl;
1561 return -1;
1562 }
1563 compressions.push_back(make_pair(columnsCompression[i].substr(0, pos), comp));
1564 }
1565
1566 //How many rows per tile should we use ?
1567 if (conf.Has("rowPerTile")) numRowsPerTile = conf.Get<unsigned int>("rowPerTile");
1568
1569 /************************************************************************************
1570 * Done reading configuration. Open relevant files
1571 ************************************************************************************/
1572
1573 //Open input's fits file
1574 factfits inFile(fileNameIn);
1575
1576 if (inFile.IsCompressedFITS())
1577 {
1578 cout << "ERROR: input file is already a compressed fits. Cannot be compressed again: Aborting." << endl;
1579 return -1;
1580 }
1581
1582 //decide how many tiles should be put in the compressed file
1583 uint32_t originalNumRows = inFile.GetNumRows();
1584 uint32_t numTiles = (originalNumRows%numRowsPerTile) ? (originalNumRows/numRowsPerTile)+1 : originalNumRows/numRowsPerTile;
1585 CompressedFitsWriter outFile(numTiles, numRowsPerTile);
1586
1587 //should we use a specific number of threads for compressing ?
1588 unsigned int numThreads = 1;
1589 if (conf.Has("threads"))
1590 {
1591 numThreads = conf.Get<unsigned int>("threads");
1592 outFile.setNumWorkingThreads(numThreads);
1593 }
1594
1595 if (!outFile.open(fileNameOut))
1596 {
1597 cout << "Error: could not open " << fileNameOut << " for writing" << endl;
1598 return -1;
1599 }
1600
1601 //Because the file to open MUST be given by the constructor, I must use a pointer instead
1602 factfits* drsFile = NULL;
1603 //try to open the Drs file. If any.
1604 if (drsFileName != "")
1605 {
1606 try
1607 {
1608 drsFile = new factfits(drsFileName);
1609 }
1610 catch (...)
1611 {
1612 cout << "Error: could not open " << drsFileName << " for calibration" << endl;
1613 return -1;
1614 }
1615 }
1616
1617 if (displayText)
1618 {
1619 cout << endl;
1620 cout << "**********************" << endl;
1621 cout << "Will compress from : " << fileNameIn << endl;
1622 cout << "to : " << fileNameOut << endl;
1623 if (drsFileName != "")
1624 cout << "while calibrating with: " << drsFileName << endl;
1625 cout << "Compression will use : " << numThreads << " worker threads" << endl;
1626 cout << "Data will be verified : ";
1627 if (verifyDataPlease)
1628 cout << "yes" << endl;
1629 else
1630 cout << "no (WARNING !)" << endl;
1631 cout << "**********************" << endl;
1632 cout << endl;
1633 }
1634
1635 /************************************************************************************
1636 * Done opening input files. Allocate memory and configure output file
1637 ************************************************************************************/
1638
1639 //allocate the buffer for temporary storage of each read/written row
1640 uint32_t rowWidth = inFile.GetUInt("NAXIS1");
1641 char* buffer = new char[rowWidth];
1642
1643 //get the source columns
1644 const fits::Table::Columns& columns = inFile.GetColumns();
1645 const fits::Table::SortedColumns& sortedColumns = inFile.GetSortedColumns();
1646 if (displayText)
1647 cout << "Input file has " << columns.size() << " columns and " << inFile.GetNumRows() << " rows" << endl;
1648
1649 //Add columns.
1650 for (uint32_t i=0;i<sortedColumns.size(); i++)
1651 {
1652 //get column name
1653 ostringstream str;
1654 str << "TTYPE" << i+1;
1655 string colName = inFile.GetStr(str.str());
1656 if (displayText)
1657 {
1658 cout << "Column " << colName;
1659 for (uint32_t j=colName.size();j<21;j++)
1660 cout << " ";
1661 cout << " -> ";
1662 }
1663
1664 //get header structures
1665 BlockHeader rawHeader;
1666 BlockHeader smoothmanHeader(0, FACT_ROW_MAJOR, 2);
1667 vector<uint16_t> rawProcessings(1);
1668 rawProcessings[0] = FACT_RAW;
1669 vector<uint16_t> smoothmanProcessings(2);
1670 smoothmanProcessings[0] = FACT_SMOOTHING;
1671 smoothmanProcessings[1] = FACT_HUFFMAN16;
1672// smoothmanProcessings[2] = FACT_RAW;
1673
1674 //first lets see if we do have an explicit request
1675 bool explicitRequest = false;
1676 for (unsigned int j=0;j<compressions.size();j++)
1677 {
1678 if (compressions[j].first == colName)
1679 {
1680 explicitRequest = true;
1681 if (displayText) cout << compressions[j].second << endl;
1682 if (compressions[j].second == "UNCOMPRESSED")
1683 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, rawHeader, rawProcessings));
1684 if (compressions[j].second == "SMOOTHMAN")
1685 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, smoothmanHeader, smoothmanProcessings));
1686 break;
1687 }
1688 }
1689
1690 if (explicitRequest) continue;
1691
1692 if (colName != "Data")
1693 {
1694 if (displayText) cout << "UNCOMPRESSED" << endl;
1695 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, rawHeader, rawProcessings));
1696 }
1697 else
1698 {
1699 if (displayText) cout << "SMOOTHMAN" << endl;
1700 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, smoothmanHeader, smoothmanProcessings));
1701 }
1702 }
1703
1704 //translate original header entries to their Z-version
1705 const fits::Table::Keys& header = inFile.GetKeys();
1706 for (fits::Table::Keys::const_iterator i=header.begin(); i!= header.end(); i++)
1707 {
1708 string k = i->first;//header[i].key();
1709 if (k == "XTENSION" || k == "BITPIX" || k == "PCOUNT" || k == "GCOUNT" || k == "TFIELDS")
1710 continue;
1711 if (k == "CHECKSUM")
1712 {
1713 outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("ZCHKSUM", i->second.value, i->second.comment));
1714 continue;
1715 }
1716 if (k == "DATASUM")
1717 {
1718 outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("ZDTASUM", i->second.value, i->second.comment));
1719 continue;
1720 }
1721 k = k.substr(0,5);
1722 if (k == "TTYPE" || k == "TFORM")
1723 {
1724 string tmpKey = i->second.fitsString;
1725 tmpKey[0] = 'Z';
1726 outFile.setHeaderKey(tmpKey);
1727 continue;
1728 }
1729 if (k == "NAXIS")
1730 continue;
1731 outFile.setHeaderKey(i->second.fitsString);
1732 }
1733
1734 //deal with the DRS calib
1735 int16_t* drsCalib16 = NULL;
1736
1737 //load the drs calib. data
1738 int32_t startCellOffset = -1;
1739 if (drsFileName != "")
1740 {
1741 drsCalib16 = new int16_t[1440*1024];
1742 float* drsCalibFloat = NULL;
1743 try
1744 {
1745 drsCalibFloat = reinterpret_cast<float*>(drsFile->SetPtrAddress("BaselineMean"));
1746 }
1747 catch (...)
1748 {
1749 cout << "Could not find column BaselineMean in drs calibration file " << drsFileName << ". Aborting" << endl;
1750 return -1;
1751 }
1752
1753 //read the calibration and calculate its integer value
1754 drsFile->GetNextRow();
1755 for (uint32_t i=0;i<1440*1024;i++)
1756 drsCalib16[i] = (int16_t)(drsCalibFloat[i]*4096.f/2000.f);
1757
1758 //assign it to the ouput file
1759 outFile.setDrsCalib(drsCalib16);
1760
1761 //get the start cells offsets
1762 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1763 if (it->first == "StartCellData")
1764 {
1765 startCellOffset = it->second.offset;
1766 if (it->second.type != 'I')
1767 {
1768 cout << "Wrong type for the StartCellData Column: " << it->second.type << " instead of I expected"<< endl;
1769 return -1;
1770 }
1771 }
1772
1773 if (startCellOffset == -1)
1774 {
1775 cout << "Could not find StartCellData in input file " << fileNameIn << ". Aborting."<< endl;
1776 return -1;
1777 }
1778 }
1779
1780 /************************************************************************************
1781 * Done configuring compression. Do the real job now !
1782 ************************************************************************************/
1783
1784 if (displayText) cout << "Converting file..." << endl;
1785
1786 int numSlices = -1;
1787 int32_t dataOffset = -1;
1788
1789 //Get the pointer to the column that must be drs-calibrated
1790 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1791 if (it->first == "Data")
1792 {
1793 numSlices = it->second.num;
1794 dataOffset = it->second.offset;
1795 }
1796 if (numSlices % 1440 != 0)
1797 {
1798 cout << "seems like the number of samples is not a multiple of 1440. Aborting." << endl;
1799 return -1;
1800 }
1801 if (dataOffset == -1)
1802 {
1803 cout << "Could not find the column Data in the input file. Aborting." << endl;
1804 return -1;
1805 }
1806
1807 numSlices /= 1440;
1808
1809 //set pointers to the readout data to later be able to gather it to "buffer".
1810 vector<void*> readPointers;
1811 vector<int32_t> readOffsets;
1812 vector<int32_t> readElemSize;
1813 vector<int32_t> readNumElems;
1814 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1815 {
1816 readPointers.push_back(inFile.SetPtrAddress(it->first));
1817 readOffsets.push_back(it->second.offset);
1818 readElemSize.push_back(it->second.size);
1819 readNumElems.push_back(it->second.num);
1820 }
1821
1822 //Convert each row one after the other
1823 for (uint32_t i=0;i<inFile.GetNumRows();i++)
1824 {
1825 if (displayText) cout << "\r Row " << i+1 << flush;
1826 inFile.GetNextRow();
1827 //copy from inFile internal structures to buffer
1828 int32_t count=0;
1829 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end();it++)
1830 {
1831 memcpy(&buffer[readOffsets[count]], readPointers[count], readElemSize[count]*readNumElems[count]);
1832 count++;
1833 }
1834
1835 if (startCellOffset != -1)
1836 {//drs calibrate this data
1837 for (int j=0;j<1440;j++)
1838 {
1839 const int thisStartCell = reinterpret_cast<int16_t*>(&buffer[startCellOffset])[j];
1840 if (thisStartCell < 0) continue;
1841 for (int k=0;k<numSlices;k++)
1842 reinterpret_cast<int16_t*>(&buffer[dataOffset])[numSlices*j + k] -= drsCalib16[1024*j + (thisStartCell+k)%1024];
1843 }
1844 }
1845 outFile.writeBinaryRow(buffer);
1846 };
1847
1848 //Get table name for later use in case the compressed file is to be verified
1849 string tableName = inFile.GetStr("EXTNAME");
1850
1851 if (displayText) cout << endl << "Done. Flushing output file..." << endl;
1852
1853 inFile.close();
1854 if (!outFile.close())
1855 {
1856 cout << "Something went wrong while writing the catalog: negative index" << endl;
1857 return false;
1858 }
1859
1860 delete[] drsCalib16;
1861
1862 if (displayText) cout << "Done." << endl;
1863
1864 /************************************************************************************
1865 * Actual job done. Should we verify what we did ?
1866 ************************************************************************************/
1867
1868 if (verifyDataPlease)
1869 {
1870 if (displayText) cout << "Now verify data..." << endl;
1871 }
1872 else
1873 return 0;
1874
1875 //get a compressed reader
1876//TEMP try to copy the file too
1877// string copyName("/scratch/copyFile.fz");
1878 string copyName("");
1879 factfits verifyFile(fileNameOut, copyName, tableName, false);
1880
1881 //and the header of the compressed file
1882 const fits::Table::Keys& header2 = verifyFile.GetKeys();
1883
1884 //get a non-compressed writer
1885 ofits reconstructedFile;
1886
1887 //figure out its name: /dev/null unless otherwise specified
1888 string reconstructedName = fileNameOut+".recons";
1889 reconstructedName = "/dev/null";
1890 reconstructedFile.open(reconstructedName.c_str(), false);
1891
1892 //reconstruct the original columns from the compressed file.
1893 string origChecksumStr;
1894 string origDatasum;
1895
1896 //reset tablename value so that it is re-read from compressed table's header
1897 tableName = "";
1898
1899 /************************************************************************************
1900 * Reconstruction setup done. Rebuild original header
1901 ************************************************************************************/
1902
1903 //re-tranlate the keys
1904 for (fits::Table::Keys::const_iterator it=header2.begin(); it!= header2.end(); it++)
1905 {
1906 string k = it->first;
1907 if (k == "XTENSION" || k == "BITPIX" || k == "PCOUNT" || k == "GCOUNT" ||
1908 k == "TFIELDS" || k == "ZTABLE" || k == "ZNAXIS1" || k == "ZNAXIS2" ||
1909 k == "ZHEAPPTR" || k == "ZPCOUNT" || k == "ZTILELEN" || k == "THEAP" ||
1910 k == "CHECKSUM" || k == "DATASUM" || k == "FCTCPVER")
1911 {
1912 continue;
1913 }
1914
1915 if (k == "ZCHKSUM")
1916 {
1917 reconstructedFile.SetKeyComment("CHECKSUM", it->second.comment);
1918 origChecksumStr = it->second.value;
1919 continue;
1920 }
1921
1922 if (k == "ZDTASUM")
1923 {
1924 reconstructedFile.SetKeyComment("DATASUM", it->second.comment);
1925 origDatasum = it->second.value;
1926 continue;
1927 }
1928
1929 if (k == "EXTNAME")
1930 {
1931 tableName = it->second.value;
1932 }
1933
1934 k = k.substr(0,5);
1935
1936 if (k == "TTYPE")
1937 {//we have an original column name here.
1938 //manually deal with these in order to preserve the ordering (easier than re-constructing yet another list on the fly)
1939 continue;
1940 }
1941
1942 if (k == "TFORM" || k == "NAXIS" || k == "ZCTYP" )
1943 {
1944 continue;
1945 }
1946
1947 if (k == "ZFORM" || k == "ZTYPE")
1948 {
1949 string tmpKey = it->second.fitsString;
1950 tmpKey[0] = 'T';
1951 reconstructedFile.SetKeyFromFitsString(tmpKey);
1952 continue;
1953 }
1954
1955 reconstructedFile.SetKeyFromFitsString(it->second.fitsString);
1956 }
1957
1958 if (tableName == "")
1959 {
1960 cout << "Error: table name from file " << fileNameOut << " could not be found. Aborting" << endl;
1961 return -1;
1962 }
1963
1964 //Restore the original columns
1965 for (uint32_t numCol=1; numCol<10000; numCol++)
1966 {
1967 ostringstream str;
1968 str << numCol;
1969 if (!verifyFile.HasKey("TTYPE"+str.str())) break;
1970
1971 string ttype = verifyFile.GetStr("TTYPE"+str.str());
1972 string tform = verifyFile.GetStr("ZFORM"+str.str());
1973 char type = tform[tform.size()-1];
1974 string number = tform.substr(0, tform.size()-1);
1975 int numElems = atoi(number.c_str());
1976
1977 if (number == "") numElems=1;
1978
1979 reconstructedFile.AddColumn(numElems, type, ttype, "", "", false);
1980 }
1981
1982 reconstructedFile.WriteTableHeader(tableName.c_str());
1983
1984 /************************************************************************************
1985 * Original header restored. Do the data
1986 ************************************************************************************/
1987
1988 //set pointers to the readout data to later be able to gather it to "buffer".
1989 readPointers.clear();
1990 readOffsets.clear();
1991 readElemSize.clear();
1992 readNumElems.clear();
1993 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1994 {
1995 readPointers.push_back(verifyFile.SetPtrAddress(it->first));
1996 readOffsets.push_back(it->second.offset);
1997 readElemSize.push_back(it->second.size);
1998 readNumElems.push_back(it->second.num);
1999 }
2000
2001 //do the actual reconstruction work
2002 uint32_t i=1;
2003 while (i<=verifyFile.GetNumRows() && verifyFile.GetNextRow())
2004 {
2005 int count=0;
2006 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end();it++)
2007 {
2008 memcpy(&buffer[readOffsets[count]], readPointers[count], readElemSize[count]*readNumElems[count]);
2009 count++;
2010 }
2011 if (displayText) cout << "\r Row " << i << flush;
2012 reconstructedFile.WriteRow(buffer, rowWidth);
2013 i++;
2014 }
2015
2016 if (displayText) cout << endl;
2017
2018 //close reconstruction input and output
2019// Do NOT close the verify file, otherwise data cannot be flushed to copy file
2020// verifyFile.close();
2021 reconstructedFile.close();
2022
2023 //get original and reconstructed checksum and datasum
2024 std::pair<string, int> origChecksum = make_pair(origChecksumStr, atoi(origDatasum.c_str()));
2025 std::pair<string, int> newChecksum = reconstructedFile.GetChecksumData();
2026
2027 //verify that no mistake was made
2028 if (origChecksum.second != newChecksum.second)
2029 {
2030 cout << "ERROR: datasums are NOT identical: " << (uint32_t)(origChecksum.second) << " vs " << (uint32_t)(newChecksum.second) << endl;
2031 return -1;
2032 }
2033 if (origChecksum.first != newChecksum.first)
2034 {
2035 cout << "WARNING: checksums are NOT Identical: " << origChecksum.first << " vs " << newChecksum.first << endl;
2036 }
2037 else
2038 {
2039 if (displayText) cout << "Ok" << endl;
2040 }
2041
2042 delete[] buffer;
2043 return 0;
2044
2045}
2046
Note: See TracBrowser for help on using the repository browser.