source: trunk/FACT++/src/fitsCompressor.cc@ 16887

Last change on this file since 16887 was 16882, checked in by lyard, 11 years ago
added correct checksum calculation for compressed fits reader (zfits)
File size: 85.2 KB
Line 
1/*
2 * fitsCompressor.cc
3 *
4 * Created on: May 7, 2013
5 * Author: lyard
6 */
7
8
9#include "Configuration.h"
10#include "../externals/factfits.h"
11#include "../externals/ofits.h"
12#include "../externals/checksum.h"
13
14#include <map>
15#include <fstream>
16#include <sstream>
17#include <iostream>
18
19
20using namespace std;
21
22typedef struct TileHeader
23{
24 char id[4];
25 uint32_t numRows;
26 uint64_t size;
27 TileHeader(uint32_t nRows=0,
28 uint64_t s=0) : id({'T', 'I', 'L', 'E'}),
29 numRows(nRows),
30 size(s)
31 { };
32} __attribute__((__packed__)) TileHeader;
33
34typedef struct BlockHeader
35{
36 uint64_t size;
37 char ordering;
38 unsigned char numProcs;
39 BlockHeader(uint64_t s=0,
40 char o=FACT_ROW_MAJOR,
41 unsigned char n=1) : size(s),
42 ordering(o),
43 numProcs(n)
44 {}
45} __attribute__((__packed__)) BlockHeader;
46
47class CompressedFitsFile
48{
49public:
50 class HeaderEntry
51 {
52 public:
53 /**
54 * Default constructor
55 */
56 HeaderEntry(): _key(""),
57 _value(""),
58 _comment(""),
59 _fitsString("")
60 {
61 }
62
63 /**
64 * Regular constructor.
65 * @param key the name of the keyword entry
66 * @param value its value
67 * @param comment an optionnal comment to be placed after the value
68 */
69 template<typename T>
70 HeaderEntry(const string& k,
71 const T& val,
72 const string& comm) : _key(k),
73 _value(""),
74 _comment(comm),
75 _fitsString("")
76 {
77 setValue(val);
78 }
79
80 /**
81 * From fits.h
82 */
83 string Trim(const string &str, char c=' ')
84 {
85 // Trim Both leading and trailing spaces
86 const size_t pstart = str.find_first_not_of(c); // Find the first character position after excluding leading blank spaces
87 const size_t pend = str.find_last_not_of(c); // Find the first character position from reverse af
88
89 // if all spaces or empty return an empty string
90 if (string::npos==pstart || string::npos==pend)
91 return string();
92
93 return str.substr(pstart, pend-pstart+1);
94 }
95 /**
96 * Constructor from the original fits entry
97 */
98 HeaderEntry(const string& line)
99 {
100 _fitsString = line.data();
101
102 //parse the line
103 _key = Trim(line.substr(0,8));
104 //COMMENT and/or HISTORY values
105 if (line.substr(8,2)!= "= ")
106 {
107 _value = "";
108 _comment = Trim(line.substr(10));
109 return;
110 }
111 string next=line.substr(10);
112 const size_t slash = next.find_first_of('/');
113 _value = Trim(Trim(Trim(next.substr(0, slash)), '\''));
114 _comment = Trim(next.substr(slash+1));
115 }
116 /**
117 * Alternative constroctor from the fits entry
118 */
119 HeaderEntry(const vector<char>& lineVec)
120 {
121 HeaderEntry(string(lineVec.data()));
122 }
123 /**
124 * Destructor
125 */
126 virtual ~HeaderEntry(){}
127
128 const string& value() const {return _value;}
129 const string& key() const {return _key;}
130 const string& comment() const {return _comment;}
131 const string& fitsString() const {return _fitsString;}
132
133 /**
134 * Set a keyword value.
135 * @param value The value to be set
136 * @param update whether the value already exist or not. To be modified soon.
137 */
138 template<typename T>
139 void setValue(const T& val)
140 {
141 ostringstream str;
142 str << val;
143 _value = str.str();
144 buildFitsString();
145 };
146 /**
147 * Set the comment for a given entry
148 */
149 void setComment(const string& comm)
150 {
151 _comment = comm;
152 buildFitsString();
153 }
154
155 private:
156 /**
157 * Construct the FITS header string from the key, value and comment
158 */
159 void buildFitsString()
160 {
161 ostringstream str;
162 unsigned int totSize = 0;
163
164 // Tuncate the key if required
165 if (_key.length() > 8)
166 {
167 str << _key.substr(0, 8);
168 totSize += 8;
169 }
170 else
171 {
172 str << _key;
173 totSize += _key.length();
174 }
175
176 // Append space if key is less than 8 chars long
177 for (int i=totSize; i<8;i++)
178 {
179 str << " ";
180 totSize++;
181 }
182
183 // Add separator
184 str << "= ";
185 totSize += 2;
186
187 // Format value
188 if (_value.length() < 20)
189 for (;totSize<30-_value.length();totSize++)
190 str << " ";
191
192 if (_value.length() > 70)
193 {
194 str << _value.substr(0,70);
195 totSize += 70;
196 }
197 else
198 {
199 str << _value;
200 totSize += _value.size();
201 }
202
203 // If there is space remaining, add comment area
204 if (totSize < 77)
205 {
206 str << " / ";
207 totSize += 3;
208 if (totSize < 80)
209 {
210 unsigned int commentSize = 80 - totSize;
211 if (_comment.length() > commentSize)
212 {
213 str << _comment.substr(0,commentSize);
214 totSize += commentSize;
215 }
216 else
217 {
218 str << _comment;
219 totSize += _comment.length();
220 }
221 }
222 }
223
224 // If there is yet some free space, fill up the entry with spaces
225 for (int i=totSize; i<80;i++)
226 str << " ";
227
228 _fitsString = str.str();
229
230 // Check for correct completion
231 if (_fitsString.length() != 80)
232 cout << "Error |" << _fitsString << "| is not of length 80" << endl;
233 }
234
235 string _key; ///< the key (name) of the header entry
236 string _value; ///< the value of the header entry
237 string _comment; ///< the comment associated to the header entry
238 string _fitsString; ///< the string that will be written to the fits file
239 };
240
241 /**
242 * Supported compressions
243 */
244 typedef enum
245 {
246 UNCOMPRESSED,
247 SMOOTHMAN
248 } FitsCompression;
249
250 /**
251 * Columns class
252 */
253 class ColumnEntry
254 {
255 public:
256 /**
257 * Default constructor
258 */
259 ColumnEntry();
260
261 /**
262 * Default destructor
263 */
264 virtual ~ColumnEntry(){}
265
266 /**
267 * Constructor from values
268 * @param n the column name
269 * @param t the column type
270 * @param numof the number of entries in the column
271 * @param comp the compression type for this column
272 */
273 ColumnEntry(const string& n,
274 char t,
275 int numOf,
276 BlockHeader& head,
277 vector<uint16_t>& seq) : _name(n),
278 _num(numOf),
279 _typeSize(0),
280 _offset(0),
281 _type(t),
282 _description(""),
283 _header(head),
284 _compSequence(seq)
285 {
286 switch (t)
287 {
288 case 'L':
289 case 'A':
290 case 'B': _typeSize = 1; break;
291 case 'I': _typeSize = 2; break;
292 case 'J':
293 case 'E': _typeSize = 4; break;
294 case 'K':
295 case 'D': _typeSize = 8; break;
296 default:
297 cout << "Error: typename " << t << " missing in the current implementation" << endl;
298 };
299
300 ostringstream str;
301 str << "data format of field: ";
302
303 switch (t)
304 {
305 case 'L': str << "1-byte BOOL"; break;
306 case 'A': str << "1-byte CHAR"; break;
307 case 'B': str << "BYTE"; break;
308 case 'I': str << "2-byte INTEGER"; break;
309 case 'J': str << "4-byte INTEGER"; break;
310 case 'K': str << "8-byte INTEGER"; break;
311 case 'E': str << "4-byte FLOAT"; break;
312 case 'D': str << "8-byte FLOAT"; break;
313 }
314
315 _description = str.str();
316 }
317
318 const string& name() const { return _name;};
319 int width() const { return _num*_typeSize;};
320 int offset() const { return _offset; };
321 int numElems() const { return _num; };
322 int sizeOfElems() const { return _typeSize;};
323 void setOffset(int off) { _offset = off;};
324 char type() const { return _type;};
325 string getDescription() const { return _description;}
326 BlockHeader& getBlockHeader() { return _header;}
327 const vector<uint16_t>& getCompressionSequence() const { return _compSequence;}
328 const char& getColumnOrdering() const { return _header.ordering;}
329
330
331 string getCompressionString() const
332 {
333 return "FACT";
334 /*
335 ostringstream str;
336 for (uint32_t i=0;i<_compSequence.size();i++)
337 switch (_compSequence[i])
338 {
339 case FACT_RAW: if (str.str().size() == 0) str << "RAW"; break;
340 case FACT_SMOOTHING: str << "SMOOTHING "; break;
341 case FACT_HUFFMAN16: str << "HUFFMAN16 "; break;
342 };
343 return str.str();*/
344 }
345
346 private:
347
348 string _name; ///< name of the column
349 int _num; ///< number of elements contained in one row of this column
350 int _typeSize; ///< the number of bytes taken by one element
351 int _offset; ///< the offset of the column, in bytes, from the beginning of one row
352 char _type; ///< the type of the column, as specified by the fits documentation
353 string _description; ///< a description for the column. It will be placed in the header
354 BlockHeader _header;
355 vector<uint16_t> _compSequence;
356 };
357
358 public:
359 ///@brief default constructor. Assigns a default number of rows and tiles
360 CompressedFitsFile(uint32_t numTiles=100, uint32_t numRowsPerTile=100);
361
362 ///@brief default destructor
363 virtual ~CompressedFitsFile();
364
365 ///@brief get the header of the file
366 vector<HeaderEntry>& getHeaderEntries() { return _header;}
367
368 protected:
369 ///@brief protected function to allocate the intermediate buffers
370 bool reallocateBuffers();
371
372 //FITS related stuff
373 vector<HeaderEntry> _header; ///< Header keys
374 vector<ColumnEntry> _columns; ///< Columns in the file
375 uint32_t _numTiles; ///< Number of tiles (i.e. groups of rows)
376 uint32_t _numRowsPerTile; ///< Number of rows per tile
377 uint32_t _totalNumRows; ///< Total number of raws
378 uint32_t _rowWidth; ///< Total number of bytes in one row
379 bool _headerFlushed; ///< Flag telling whether the header record is synchronized with the data on disk
380 char* _buffer; ///< Memory buffer to store rows while they are not compressed
381 Checksum _checksum; ///< Checksum for asserting the consistency of the data
382 fstream _file; ///< The actual file streamer for accessing disk data
383
384 //compression related stuff
385 typedef pair<int64_t, int64_t> CatalogEntry;
386 typedef vector<CatalogEntry> CatalogRow;
387 typedef vector<CatalogRow> CatalogType;
388 CatalogType _catalog; ///< Catalog, i.e. the main table that points to the compressed data.
389 uint64_t _heapPtr; ///< the address in the file of the heap area
390 vector<char*> _transposedBuffer; ///< Memory buffer to store rows while they are transposed
391 vector<char*> _compressedBuffer; ///< Memory buffer to store rows while they are compressed
392
393 //thread related stuff
394 uint32_t _numThreads; ///< The number of threads that will be used to compress
395 uint32_t _threadIndex; ///< A variable to assign threads indices
396 vector<pthread_t> _thread; ///< The thread handler of the compressor
397 vector<uint32_t> _threadNumRows; ///< Total number of rows for thread to compress
398 vector<uint32_t> _threadStatus; ///< Flag telling whether the buffer to be transposed (and compressed) is full or empty
399
400 //thread states. Not all used, but they do not hurt
401 static const uint32_t _THREAD_WAIT_; ///< Thread doing nothing
402 static const uint32_t _THREAD_COMPRESS_; ///< Thread working, compressing
403 static const uint32_t _THREAD_DECOMPRESS_; ///< Thread working, decompressing
404 static const uint32_t _THREAD_WRITE_; ///< Thread writing data to disk
405 static const uint32_t _THREAD_READ_; ///< Thread reading data from disk
406 static const uint32_t _THREAD_EXIT_; ///< Thread exiting
407
408 static HeaderEntry _dummyHeaderEntry; ///< Dummy entry for returning if requested on is not found
409};
410
411class CompressedFitsWriter : public CompressedFitsFile
412{
413 public:
414 ///@brief Default constructor. 100 tiles of 100 rows each are assigned by default
415 CompressedFitsWriter(uint32_t numTiles=100, uint32_t numRowsPerTile=100);
416
417 ///@brief default destructor
418 virtual ~CompressedFitsWriter();
419
420 ///@brief add one column to the file
421 bool addColumn(const ColumnEntry& column);
422
423 ///@brief sets a given header key
424 bool setHeaderKey(const HeaderEntry&);
425
426 ///@brief open a new fits file
427 bool open(const string& fileName, const string& tableName="Data");
428
429 ///@brief close the opened file
430 bool close();
431
432 ///@brief write one row of data, already placed in bufferToWrite. Does the byte-swapping
433 bool writeBinaryRow(const char* bufferToWrite);
434
435 ///@brief assign a given (already loaded) drs calibration
436 void setDrsCalib(int16_t* data);
437
438 ///@brief set the number of worker threads compressing the data.
439 bool setNumWorkingThreads(uint32_t num);
440
441 private:
442 ///@brief compresses one buffer of data, the one given by threadIndex
443 uint64_t compressBuffer(uint32_t threadIndex);
444
445 ///@brief writes an already compressed buffer to disk
446 bool writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite);
447
448 ///@brief add the header checksum to the datasum
449 void addHeaderChecksum(Checksum& checksum);
450
451 ///@brief write the header. If closingFile is set to true, checksum is calculated
452 void writeHeader(bool closingFile = false);
453
454 ///@brief write the compressed data catalog. If closingFile is set to true, checksum is calculated
455 void writeCatalog(bool closingFile=false);
456
457 /// FIXME this was a bad idea. Move everything to the regular header
458 vector<HeaderEntry> _defaultHeader;
459
460 /// the main function compressing the data
461 static void* threadFunction(void* context);
462
463 /// Write the drs calibration to disk, if any
464 void writeDrsCalib();
465
466 /// Copy and transpose (or semi-transpose) one tile of data
467 void copyTransposeTile(uint32_t index);
468
469 /// Specific compression functions
470 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
471 uint32_t compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
472 uint32_t compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
473 uint32_t applySMOOTHING(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
474
475 int32_t _checkOffset; ///< offset to the data pointer to calculate the checksum
476 int16_t* _drsCalibData; ///< array of the Drs baseline mean
477 int32_t _threadLooper; ///< Which thread will deal with the upcoming bunch of data ?
478 pthread_mutex_t _mutex; ///< mutex for compressing threads
479
480
481 static string _emptyBlock; ///< an empty block to be apened at the end of a file so that its length is a multiple of 2880
482 static string _fitsHeader; ///< the default header to be written in every fits file
483};
484
485const uint32_t CompressedFitsFile::_THREAD_WAIT_ = 0;
486const uint32_t CompressedFitsFile::_THREAD_COMPRESS_ = 1;
487const uint32_t CompressedFitsFile::_THREAD_DECOMPRESS_= 2;
488const uint32_t CompressedFitsFile::_THREAD_WRITE_ = 3;
489const uint32_t CompressedFitsFile::_THREAD_READ_ = 4;
490const uint32_t CompressedFitsFile::_THREAD_EXIT_ = 5;
491
492template<>
493void CompressedFitsFile::HeaderEntry::setValue(const string& v)
494{
495 string val = v;
496 if (val.size() > 2 && val[0] == '\'')
497 {
498 size_t pos = val.find_last_of("'");
499 if (pos != string::npos && pos != 0)
500 val = val.substr(1, pos-1);
501 }
502 ostringstream str;
503
504 str << "'" << val << "'";
505 for (int i=str.str().length(); i<20;i++)
506 str << " ";
507 _value = str.str();
508 buildFitsString();
509}
510
511/**
512 * Default header to be written in all fits files
513 */
514string CompressedFitsWriter::_fitsHeader = "SIMPLE = T / file does conform to FITS standard "
515 "BITPIX = 8 / number of bits per data pixel "
516 "NAXIS = 0 / number of data axes "
517 "EXTEND = T / FITS dataset may contain extensions "
518 "CHECKSUM= '4AcB48bA4AbA45bA' / Checksum for the whole HDU "
519 "DATASUM = ' 0' / Checksum for the data block "
520 "COMMENT FITS (Flexible Image Transport System) format is defined in 'Astronomy"
521 "COMMENT and Astrophysics', volume 376, page 359; bibcode: 2001A&A...376..359H "
522 "END "
523 " "
524 " "
525 " "
526 " "
527 " "
528 " "
529 " "
530 " "
531 " "
532 " "
533 " "
534 " "
535 " "
536 " "
537 " "
538 " "
539 " "
540 " "
541 " "
542 " "
543 " "
544 " "
545 " "
546 " "
547 " "
548 " "
549 " ";
550
551/**
552 * Empty block to be appenned at the end of files, so that the length matches multiple of 2880 bytes
553 *
554 */
555string CompressedFitsWriter::_emptyBlock = " "
556 " "
557 " "
558 " "
559 " "
560 " "
561 " "
562 " "
563 " "
564 " "
565 " "
566 " "
567 " "
568 " "
569 " "
570 " "
571 " "
572 " "
573 " "
574 " "
575 " "
576 " "
577 " "
578 " "
579 " "
580 " "
581 " "
582 " "
583 " "
584 " "
585 " "
586 " "
587 " "
588 " "
589 " "
590 " ";
591
592CompressedFitsFile::HeaderEntry CompressedFitsFile::_dummyHeaderEntry;
593/****************************************************************
594 * SUPER CLASS DEFAULT CONSTRUCTOR
595 ****************************************************************/
596CompressedFitsFile::CompressedFitsFile(uint32_t numTiles,
597 uint32_t numRowsPerTile) : _header(),
598 _columns(),
599 _numTiles(numTiles),
600 _numRowsPerTile(numRowsPerTile),
601 _totalNumRows(0),
602 _rowWidth(0),
603 _headerFlushed(false),
604 _buffer(NULL),
605 _checksum(0),
606 _heapPtr(0),
607 _transposedBuffer(1),
608 _compressedBuffer(1),
609 _numThreads(1),
610 _threadIndex(0),
611 _thread(1),
612 _threadNumRows(1),
613 _threadStatus(1)
614{
615 _catalog.resize(_numTiles);
616 _transposedBuffer[0] = NULL;
617 _compressedBuffer[0] = NULL;
618 _threadStatus[0] = _THREAD_WAIT_;
619 _threadNumRows[0] = 0;
620}
621
622/****************************************************************
623 * SUPER CLASS DEFAULT DESTRUCTOR
624 ****************************************************************/
625CompressedFitsFile::~CompressedFitsFile()
626{
627 if (_buffer != NULL)
628 {
629 delete[] _buffer;
630 _buffer = NULL;
631 for (uint32_t i=0;i<_numThreads;i++)
632 {
633 _compressedBuffer[i] = _compressedBuffer[i]-4;
634 delete[] _transposedBuffer[i];
635 delete[] _compressedBuffer[i];
636 _transposedBuffer[i] = NULL;
637 _compressedBuffer[i] = NULL;
638 }
639 }
640 if (_file.is_open())
641 _file.close();
642}
643
644/****************************************************************
645 * REALLOCATE BUFFER
646 ****************************************************************/
647bool CompressedFitsFile::reallocateBuffers()
648{
649 if (_buffer)
650 {
651 delete[] _buffer;
652 for (uint32_t i=0;i<_compressedBuffer.size();i++)
653 {
654 _compressedBuffer[i] = _compressedBuffer[i]-4;
655 delete[] _transposedBuffer[i];
656 delete[] _compressedBuffer[i];
657 }
658 }
659 _buffer = new char[_rowWidth*_numRowsPerTile];
660 if (_buffer == NULL) return false;
661 if (_compressedBuffer.size() != _numThreads)
662 {
663 _transposedBuffer.resize(_numThreads);
664 _compressedBuffer.resize(_numThreads);
665 }
666 for (uint32_t i=0;i<_numThreads;i++)
667 {
668 _transposedBuffer[i] = new char[_rowWidth*_numRowsPerTile];
669 _compressedBuffer[i] = new char[_rowWidth*_numRowsPerTile + _columns.size() + sizeof(TileHeader)]; //use a bit more memory for compression flags
670 if (_transposedBuffer[i] == NULL || _compressedBuffer[i] == NULL)
671 return false;
672 //shift the compressed buffer by 4 bytes, for checksum calculation
673 memset(_compressedBuffer[i], 0, 4);
674 _compressedBuffer[i] = _compressedBuffer[i]+4;
675 //initialize the tile header
676 TileHeader tileHeader;
677 memcpy(_compressedBuffer[i], &tileHeader, sizeof(TileHeader));
678 }
679 return true;
680}
681
682/****************************************************************
683 * DEFAULT WRITER CONSTRUCTOR
684 ****************************************************************/
685CompressedFitsWriter::CompressedFitsWriter(uint32_t numTiles,
686 uint32_t numRowsPerTile) : CompressedFitsFile(numTiles, numRowsPerTile),
687 _checkOffset(0),
688 _drsCalibData(NULL),
689 _threadLooper(0)
690{
691 _defaultHeader.push_back(HeaderEntry("XTENSION", "'BINTABLE' ", "binary table extension"));
692 _defaultHeader.push_back(HeaderEntry("BITPIX", 8, "8-bit bytes"));
693 _defaultHeader.push_back(HeaderEntry("NAXIS", 2, "2-dimensional binary table"));
694 _defaultHeader.push_back(HeaderEntry("NAXIS1", _rowWidth, "width of table in bytes"));
695 _defaultHeader.push_back(HeaderEntry("NAXIS2", numTiles, "num of rows in table"));
696 _defaultHeader.push_back(HeaderEntry("PCOUNT", 0, "size of special data area"));
697 _defaultHeader.push_back(HeaderEntry("GCOUNT", 1, "one data group (required keyword)"));
698 _defaultHeader.push_back(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
699 _defaultHeader.push_back(HeaderEntry("CHECKSUM", "'0000000000000000' ", "Checksum for the whole HDU"));
700 _defaultHeader.push_back(HeaderEntry("DATASUM", " 0", "Checksum for the data block"));
701 //compression stuff
702 _defaultHeader.push_back(HeaderEntry("ZTABLE", "T", "Table is compressed"));
703 _defaultHeader.push_back(HeaderEntry("ZNAXIS1", 0, "Width of uncompressed rows"));
704 _defaultHeader.push_back(HeaderEntry("ZNAXIS2", 0, "Number of uncompressed rows"));
705 _defaultHeader.push_back(HeaderEntry("ZPCOUNT", 0, ""));
706 _defaultHeader.push_back(HeaderEntry("ZHEAPPTR", 0, ""));
707 _defaultHeader.push_back(HeaderEntry("ZTILELEN", numRowsPerTile, "Number of rows per tile"));
708 _defaultHeader.push_back(HeaderEntry("THEAP", 0, ""));
709
710 pthread_mutex_init(&_mutex, NULL);
711}
712
713/****************************************************************
714 * DEFAULT DESTRUCTOR
715 ****************************************************************/
716CompressedFitsWriter::~CompressedFitsWriter()
717{
718 pthread_mutex_destroy(&_mutex);
719}
720
721/****************************************************************
722 * SET THE POINTER TO THE DRS CALIBRATION
723 ****************************************************************/
724void CompressedFitsWriter::setDrsCalib(int16_t* data)
725{
726 _drsCalibData = data;
727}
728
729/****************************************************************
730 * SET NUM WORKING THREADS
731 ****************************************************************/
732bool CompressedFitsWriter::setNumWorkingThreads(uint32_t num)
733{
734 if (_file.is_open())
735 return false;
736 if (num < 1 || num > 64)
737 {
738 cout << "ERROR: num threads must be between 1 and 64. Ignoring" << endl;
739 return false;
740 }
741 _numThreads = num;
742 _transposedBuffer[0] = NULL;
743 _compressedBuffer[0] = NULL;
744 _threadStatus.resize(num);
745 _thread.resize(num);
746 _threadNumRows.resize(num);
747 for (uint32_t i=0;i<num;i++)
748 {
749 _threadNumRows[i] = 0;
750 _threadStatus[i] = _THREAD_WAIT_;
751 }
752 return reallocateBuffers();
753}
754
755/****************************************************************
756 * WRITE DRS CALIBRATION TO FILE
757 ****************************************************************/
758void CompressedFitsWriter::writeDrsCalib()
759{
760 //if file was not loaded, ignore
761 if (_drsCalibData == NULL)
762 return;
763 uint64_t whereDidIStart = _file.tellp();
764 vector<HeaderEntry> header;
765 header.push_back(HeaderEntry("XTENSION", "'BINTABLE' ", "binary table extension"));
766 header.push_back(HeaderEntry("BITPIX" , 8 , "8-bit bytes"));
767 header.push_back(HeaderEntry("NAXIS" , 2 , "2-dimensional binary table"));
768 header.push_back(HeaderEntry("NAXIS1" , 1024*1440*2 , "width of table in bytes"));
769 header.push_back(HeaderEntry("NAXIS2" , 1 , "number of rows in table"));
770 header.push_back(HeaderEntry("PCOUNT" , 0 , "size of special data area"));
771 header.push_back(HeaderEntry("GCOUNT" , 1 , "one data group (required keyword)"));
772 header.push_back(HeaderEntry("TFIELDS" , 1 , "number of fields in each row"));
773 header.push_back(HeaderEntry("CHECKSUM", "'0000000000000000' ", "Checksum for the whole HDU"));
774 header.push_back(HeaderEntry("DATASUM" , " 0" , "Checksum for the data block"));
775 header.push_back(HeaderEntry("EXTNAME" , "'ZDrsCellOffsets' ", "name of this binary table extension"));
776 header.push_back(HeaderEntry("TTYPE1" , "'OffsetCalibration' ", "label for field 1"));
777 header.push_back(HeaderEntry("TFORM1" , "'1474560I' ", "data format of field: 2-byte INTEGER"));
778
779 for (uint32_t i=0;i<header.size();i++)
780 _file.write(header[i].fitsString().c_str(), 80);
781 //End the header
782 _file.write("END ", 80);
783 long here = _file.tellp();
784 if (here%2880)
785 _file.write(_emptyBlock.c_str(), 2880 - here%2880);
786 //now write the data itself
787 int16_t* swappedBytes = new int16_t[1024];
788 Checksum checksum;
789 for (int32_t i=0;i<1440;i++)
790 {
791 memcpy(swappedBytes, &(_drsCalibData[i*1024]), 2048);
792 for (int32_t j=0;j<2048;j+=2)
793 {
794 int8_t inter;
795 inter = reinterpret_cast<int8_t*>(swappedBytes)[j];
796 reinterpret_cast<int8_t*>(swappedBytes)[j] = reinterpret_cast<int8_t*>(swappedBytes)[j+1];
797 reinterpret_cast<int8_t*>(swappedBytes)[j+1] = inter;
798 }
799 _file.write(reinterpret_cast<char*>(swappedBytes), 2048);
800 checksum.add(reinterpret_cast<char*>(swappedBytes), 2048);
801 }
802 uint64_t whereDidIStop = _file.tellp();
803 delete[] swappedBytes;
804 //No need to pad the data, as (1440*1024*2)%2880==0
805
806 //calculate the checksum from the header
807 ostringstream str;
808 str << checksum.val();
809 header[9] = HeaderEntry("DATASUM", str.str(), "Checksum for the data block");
810 for (vector<HeaderEntry>::iterator it=header.begin();it!=header.end(); it++)
811 checksum.add(it->fitsString().c_str(), 80);
812 string end("END ");
813 string space(" ");
814 checksum.add(end.c_str(), 80);
815 int headerRowsLeft = 36 - (header.size() + 1)%36;
816 for (int i=0;i<headerRowsLeft;i++)
817 checksum.add(space.c_str(), 80);
818 //udpate the checksum keyword
819 header[8] = HeaderEntry("CHECKSUM", checksum.str(), "Checksum for the whole HDU");
820 //and eventually re-write the header data
821 _file.seekp(whereDidIStart);
822 for (uint32_t i=0;i<header.size();i++)
823 _file.write(header[i].fitsString().c_str(), 80);
824 _file.seekp(whereDidIStop);
825}
826
827/****************************************************************
828 * ADD COLUMN
829 ****************************************************************/
830bool CompressedFitsWriter::addColumn(const ColumnEntry& column)
831{
832 if (_totalNumRows != 0)
833 {
834 cout << "Error: cannot add new columns once first row has been written" << endl;
835 return false;
836 }
837 for (vector<ColumnEntry>::iterator it=_columns.begin(); it != _columns.end(); it++)
838 {
839 if (it->name() == column.name())
840 {
841 cout << "Warning: column already exist (" << column.name() << "). Ignoring" << endl;
842 return false;
843 }
844 }
845 _columns.push_back(column);
846 _columns.back().setOffset(_rowWidth);
847 _rowWidth += column.width();
848 reallocateBuffers();
849
850 ostringstream str, str2, str3;
851 str << "TTYPE" << _columns.size();
852 str2 << column.name();
853 str3 << "label for field ";
854 if (_columns.size() < 10) str3 << " ";
855 if (_columns.size() < 100) str3 << " ";
856 str3 << _columns.size();
857 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
858 str.str("");
859 str2.str("");
860 str3.str("");
861 str << "TFORM" << _columns.size();
862 str2 << "1QB";
863 str3 << "data format of field " << _columns.size();
864 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
865 str.str("");
866 str2.str("");
867 str3.str("");
868 str << "ZFORM" << _columns.size();
869 str2 << column.numElems() << column.type();
870 str3 << "Original format of field " << _columns.size();
871 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
872 str.str("");
873 str2.str("");
874 str3.str("");
875 str << "ZCTYP" << _columns.size();
876 str2 << column.getCompressionString();
877 str3 << "Comp. Scheme of field " << _columns.size();
878 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
879 //resize the catalog vector accordingly
880 for (uint32_t i=0;i<_numTiles;i++)
881 {
882 _catalog[i].resize(_columns.size());
883 for (uint32_t j=0;j<_catalog[i].size();j++)
884 _catalog[i][j] = make_pair(0,0);
885 }
886 return true;
887}
888
889/****************************************************************
890 * SET HEADER KEY
891 ****************************************************************/
892bool CompressedFitsWriter::setHeaderKey(const HeaderEntry& entry)
893{
894 HeaderEntry ent = entry;
895 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
896 {
897 if (it->key() == entry.key())
898 {
899 if (entry.comment() == "")
900 ent.setComment(it->comment());
901 (*it) = ent;
902 _headerFlushed = false;
903 return true;
904 }
905 }
906 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
907 {
908 if (it->key() == entry.key())
909 {
910 if (entry.comment() == "")
911 ent.setComment(it->comment());
912 (*it) = ent;
913 _headerFlushed = false;
914 return true;
915 }
916 }
917 if (_totalNumRows != 0)
918 {
919 cout << "Error: new header keys (" << entry.key() << ") must be set before the first row is written. Ignoring." << endl;
920 return false;
921 }
922 _header.push_back(entry);
923 _headerFlushed = false;
924 return true;
925}
926
927/****************************************************************
928 * OPEN
929 ****************************************************************/
930bool CompressedFitsWriter::open(const string& fileName, const string& tableName)
931{
932 _file.open(fileName.c_str(), ios_base::out);
933 if (!_file.is_open())
934 {
935 cout << "Error: Could not open the file (" << fileName << ")." << endl;
936 return false;
937 }
938 _defaultHeader.push_back(HeaderEntry("EXTNAME", tableName, "name of this binary table extension"));
939 _headerFlushed = false;
940 _threadIndex = 0;
941 //create requested number of threads
942 for (uint32_t i=0;i<_numThreads;i++)
943 pthread_create(&(_thread[i]), NULL, threadFunction, this);
944 //wait for the threads to start
945 while (_numThreads != _threadIndex)
946 usleep(1000);
947 //set the writing fence to the last thread
948 _threadIndex = _numThreads-1;
949 return (_file.good());
950}
951
952/****************************************************************
953 * WRITE HEADER
954 ****************************************************************/
955void CompressedFitsWriter::writeHeader(bool closingFile)
956{
957 if (_headerFlushed)
958 return;
959 if (!_file.is_open())
960 return;
961
962 long cPos = _file.tellp();
963
964 _file.seekp(0);
965
966 _file.write(_fitsHeader.c_str(), 2880);
967
968 //Write the DRS calib table here !
969 writeDrsCalib();
970
971 //we are now at the beginning of the main table. Write its header
972 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
973 _file.write(it->fitsString().c_str(), 80);
974
975 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
976 _file.write(it->fitsString().c_str(), 80);
977
978 _file.write("END ", 80);
979 long here = _file.tellp();
980 if (here%2880)
981 _file.write(_emptyBlock.c_str(), 2880 - here%2880);
982
983 _headerFlushed = true;
984
985 here = _file.tellp();
986
987 if (here%2880)
988 cout << "Error: seems that header did not finish at the end of a block." << endl;
989
990 if (here > cPos && cPos != 0)
991 {
992 cout << "Error, entries were added after the first row was written. This is not supposed to happen." << endl;
993 return;
994 }
995
996 here = _file.tellp();
997 writeCatalog(closingFile);
998
999 here = _file.tellp() - here;
1000 _heapPtr = here;
1001
1002 if (cPos != 0)
1003 _file.seekp(cPos);
1004}
1005
1006/****************************************************************
1007 * WRITE CATALOG
1008 * WARNING: writeCatalog is only meant to be used by writeHeader.
1009 * external usage will most likely corrupt the file
1010 ****************************************************************/
1011void CompressedFitsWriter::writeCatalog(bool closingFile)
1012{
1013 uint32_t sizeWritten = 0;
1014 for (uint32_t i=0;i<_catalog.size();i++)
1015 {
1016 for (uint32_t j=0;j<_catalog[i].size();j++)
1017 {
1018 //swap the bytes
1019 int8_t swappedEntry[16];
1020 swappedEntry[0] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[7];
1021 swappedEntry[1] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[6];
1022 swappedEntry[2] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[5];
1023 swappedEntry[3] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[4];
1024 swappedEntry[4] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[3];
1025 swappedEntry[5] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[2];
1026 swappedEntry[6] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[1];
1027 swappedEntry[7] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[0];
1028
1029 swappedEntry[8] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[7];
1030 swappedEntry[9] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[6];
1031 swappedEntry[10] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[5];
1032 swappedEntry[11] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[4];
1033 swappedEntry[12] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[3];
1034 swappedEntry[13] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[2];
1035 swappedEntry[14] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[1];
1036 swappedEntry[15] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[0];
1037 if (closingFile)
1038 {
1039 _checksum.add(reinterpret_cast<char*>(swappedEntry), 16);
1040 }
1041 _file.write(reinterpret_cast<char*>(&swappedEntry[0]), 2*sizeof(int64_t));
1042 sizeWritten += 2*sizeof(int64_t);
1043 }
1044 }
1045
1046 //we do not reserve space for now because fverify does not like that.
1047 //TODO bug should be fixed in the new version. Install it on the cluster and restor space reservation
1048 return ;
1049
1050 //write the padding so that the HEAP section starts at a 2880 bytes boundary
1051 if (sizeWritten % 2880 != 0)
1052 {
1053 vector<char> nullVec(2880 - sizeWritten%2880, 0);
1054 _file.write(nullVec.data(), 2880 - sizeWritten%2880);
1055 }
1056}
1057
1058/****************************************************************
1059 * ADD HEADER CHECKSUM
1060 ****************************************************************/
1061void CompressedFitsWriter::addHeaderChecksum(Checksum& checksum)
1062{
1063 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin();it!=_defaultHeader.end(); it++)
1064 _checksum.add(it->fitsString().c_str(), 80);
1065 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
1066 _checksum.add(it->fitsString().c_str(), 80);
1067 string end("END ");
1068 string space(" ");
1069 checksum.add(end.c_str(), 80);
1070 int headerRowsLeft = 36 - (_defaultHeader.size() + _header.size() + 1)%36;
1071 for (int i=0;i<headerRowsLeft;i++)
1072 checksum.add(space.c_str(), 80);
1073}
1074
1075/****************************************************************
1076 * CLOSE
1077 ****************************************************************/
1078bool CompressedFitsWriter::close()
1079{
1080 for (uint32_t i=0;i<_numThreads;i++)
1081 while (_threadStatus[i] != _THREAD_WAIT_)
1082 usleep(100000);
1083 for (uint32_t i=0;i<_numThreads;i++)
1084 _threadStatus[i] = _THREAD_EXIT_;
1085 for (uint32_t i=0;i<_numThreads;i++)
1086 pthread_join(_thread[i], NULL);
1087 //flush the rows that were not written yet
1088 if (_totalNumRows%_numRowsPerTile != 0)
1089 {
1090 copyTransposeTile(0);
1091
1092 _threadNumRows[0] = _totalNumRows;
1093 uint32_t numBytes = compressBuffer(0);
1094 writeCompressedDataToDisk(0, numBytes);
1095 }
1096 //compression stuff
1097 setHeaderKey(HeaderEntry("ZNAXIS1", _rowWidth, "Width of uncompressed rows"));
1098 setHeaderKey(HeaderEntry("ZNAXIS2", _totalNumRows, "Number of uncompressed rows"));
1099 //TODO calculate the real offset from the main table to the start of the HEAP data area
1100 setHeaderKey(HeaderEntry("ZHEAPPTR", _heapPtr, ""));
1101 setHeaderKey(HeaderEntry("THEAP", _heapPtr, ""));
1102
1103 //regular stuff
1104 if (_catalog.size() > 0)
1105 {
1106 setHeaderKey(HeaderEntry("NAXIS1", 2*sizeof(int64_t)*_catalog[0].size(), "width of table in bytes"));
1107 setHeaderKey(HeaderEntry("NAXIS2", _numTiles, ""));
1108 setHeaderKey(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
1109 int64_t heapSize = 0;
1110 int64_t compressedOffset = 0;
1111 for (uint32_t i=0;i<_catalog.size();i++)
1112 {
1113 compressedOffset += sizeof(TileHeader);
1114 heapSize += sizeof(TileHeader);
1115 for (uint32_t j=0;j<_catalog[i].size();j++)
1116 {
1117 heapSize += _catalog[i][j].first;
1118// cout << "heapSize: " << heapSize << endl;
1119 //set the catalog offsets to their actual values
1120 _catalog[i][j].second = compressedOffset;
1121 compressedOffset += _catalog[i][j].first;
1122 //special case if entry has zero length
1123 if (_catalog[i][j].first == 0) _catalog[i][j].second = 0;
1124 }
1125 }
1126 setHeaderKey(HeaderEntry("PCOUNT", heapSize, "size of special data area"));
1127 }
1128 else
1129 {
1130 setHeaderKey(HeaderEntry("NAXIS1", _rowWidth, "width of table in bytes"));
1131 setHeaderKey(HeaderEntry("NAXIS2", 0, ""));
1132 setHeaderKey(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
1133 setHeaderKey(HeaderEntry("PCOUNT", 0, "size of special data area"));
1134 }
1135 writeHeader(true);
1136
1137 ostringstream str;
1138 str << _checksum.val();
1139
1140 setHeaderKey(HeaderEntry("DATASUM", str.str(), ""));
1141 addHeaderChecksum(_checksum);
1142 setHeaderKey(HeaderEntry("CHECKSUM", _checksum.str(), ""));
1143 //update header value
1144 writeHeader();
1145 //update file length
1146 long here = _file.tellp();
1147 if (here%2880)
1148 {
1149 vector<char> nullVec(2880 - here%2880, 0);
1150 _file.write(nullVec.data(), 2880 - here%2880);
1151 }
1152 _file.close();
1153 return true;
1154}
1155
1156/****************************************************************
1157 * COPY TRANSPOSE TILE
1158 ****************************************************************/
1159void CompressedFitsWriter::copyTransposeTile(uint32_t index)
1160{
1161 uint32_t thisRoundNumRows = (_totalNumRows%_numRowsPerTile) ? _totalNumRows%_numRowsPerTile : _numRowsPerTile;
1162 //copy the tile and transpose it
1163 uint32_t offset = 0;
1164 for (uint32_t i=0;i<_columns.size();i++)
1165 {
1166 switch (_columns[i].getColumnOrdering())//getCompression())
1167 {
1168 case FACT_ROW_MAJOR:
1169 for (uint32_t k=0;k<thisRoundNumRows;k++)
1170 {//regular, "semi-transposed" copy
1171 memcpy(&(_transposedBuffer[index][offset]), &_buffer[k*_rowWidth + _columns[i].offset()], _columns[i].sizeOfElems()*_columns[i].numElems());
1172 offset += _columns[i].sizeOfElems()*_columns[i].numElems();
1173 }
1174 break;
1175
1176 case FACT_COL_MAJOR :
1177 for (int j=0;j<_columns[i].numElems();j++)
1178 for (uint32_t k=0;k<thisRoundNumRows;k++)
1179 {//transposed copy
1180 memcpy(&(_transposedBuffer[index][offset]), &_buffer[k*_rowWidth + _columns[i].offset() + _columns[i].sizeOfElems()*j], _columns[i].sizeOfElems());
1181 offset += _columns[i].sizeOfElems();
1182 }
1183 break;
1184 default:
1185 cout << "Error: unknown column ordering: " << _columns[i].getColumnOrdering() << endl;
1186
1187 };
1188 }
1189}
1190
1191/****************************************************************
1192 * WRITE BINARY ROW
1193 ****************************************************************/
1194bool CompressedFitsWriter::writeBinaryRow(const char* bufferToWrite)
1195{
1196 if (_totalNumRows == 0)
1197 writeHeader();
1198
1199 memcpy(&_buffer[_rowWidth*(_totalNumRows%_numRowsPerTile)], bufferToWrite, _rowWidth);
1200 _totalNumRows++;
1201 if (_totalNumRows%_numRowsPerTile == 0)
1202 {
1203 //which is the next thread that we should use ?
1204 while (_threadStatus[_threadLooper] == _THREAD_COMPRESS_)
1205 usleep(100000);
1206
1207 copyTransposeTile(_threadLooper);
1208
1209 while (_threadStatus[_threadLooper] != _THREAD_WAIT_)
1210 usleep(100000);
1211
1212 _threadNumRows[_threadLooper] = _totalNumRows;
1213 _threadStatus[_threadLooper] = _THREAD_COMPRESS_;
1214 _threadLooper = (_threadLooper+1)%_numThreads;
1215 }
1216 return _file.good();
1217}
1218
1219/****************************************************************
1220 * COMPRESS BUFFER
1221 ****************************************************************/
1222uint32_t CompressedFitsWriter::compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1223{
1224 memcpy(dest, src, numRows*sizeOfElems*numRowElems);
1225 return numRows*sizeOfElems*numRowElems;
1226}
1227
1228/****************************************************************
1229 * COMPRESS BUFFER
1230 ****************************************************************/
1231uint32_t CompressedFitsWriter::compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1232{
1233 string huffmanOutput;
1234 uint32_t previousHuffmanSize = 0;
1235 if (numRows < 2)
1236 {//if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
1237 return numRows*sizeOfElems*numRowElems + 1000;
1238 }
1239 if (sizeOfElems < 2 )
1240 {
1241 cout << "Fatal ERROR: HUFMANN can only encode short or longer types" << endl;
1242 return 0;
1243 }
1244 uint32_t huffmanOffset = 0;
1245 for (uint32_t j=0;j<numRowElems;j++)
1246 {
1247 Huffman::Encode(huffmanOutput,
1248 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
1249 numRows*(sizeOfElems/2));
1250 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
1251 huffmanOffset += sizeof(uint32_t);
1252 previousHuffmanSize = huffmanOutput.size();
1253 }
1254 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
1255
1256 //only copy if not larger than not-compressed size
1257 if (totalSize < numRows*sizeOfElems*numRowElems)
1258 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
1259
1260 return totalSize;
1261}
1262
1263/****************************************************************
1264 * COMPRESS BUFFER
1265 ****************************************************************/
1266uint32_t CompressedFitsWriter::compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1267{
1268 uint32_t colWidth = numRowElems;
1269 for (int j=colWidth*numRows-1;j>1;j--)
1270 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
1271 //call the huffman transposed
1272 return compressHUFFMAN(dest, src, numRowElems, sizeOfElems, numRows);
1273}
1274
1275uint32_t CompressedFitsWriter::applySMOOTHING(char* , char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1276{
1277 uint32_t colWidth = numRowElems;
1278 for (int j=colWidth*numRows-1;j>1;j--)
1279 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
1280
1281 return numRows*sizeOfElems*numRowElems;
1282}
1283/****************************************************************
1284 * COMPRESS BUFFER
1285 ****************************************************************/
1286uint64_t CompressedFitsWriter::compressBuffer(uint32_t threadIndex)
1287{
1288 uint32_t thisRoundNumRows = (_threadNumRows[threadIndex]%_numRowsPerTile) ? _threadNumRows[threadIndex]%_numRowsPerTile : _numRowsPerTile;
1289 uint32_t offset=0;
1290 uint32_t currentCatalogRow = (_threadNumRows[threadIndex]-1)/_numRowsPerTile;
1291 uint64_t compressedOffset = sizeof(TileHeader); //skip the 'TILE' marker and tile size entry
1292
1293 //now compress each column one by one by calling compression on arrays
1294 for (uint32_t i=0;i<_columns.size();i++)
1295 {
1296 _catalog[currentCatalogRow][i].second = compressedOffset;
1297
1298 if (_columns[i].numElems() == 0) continue;
1299
1300 BlockHeader& head = _columns[i].getBlockHeader();
1301 const vector<uint16_t>& sequence = _columns[i].getCompressionSequence();
1302 //set the default byte telling if uncompressed the compressed Flag
1303 uint64_t previousOffset = compressedOffset;
1304 //skip header data
1305 compressedOffset += sizeof(BlockHeader) + sizeof(uint16_t)*sequence.size();
1306
1307 for (uint32_t j=0;j<sequence.size(); j++)
1308 {
1309 switch (sequence[j])
1310 {
1311 case FACT_RAW:
1312// if (head.numProcs == 1)
1313 compressedOffset += compressUNCOMPRESSED(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1314 break;
1315 case FACT_SMOOTHING:
1316 applySMOOTHING(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1317 break;
1318 case FACT_HUFFMAN16:
1319 if (head.ordering == FACT_COL_MAJOR)
1320 compressedOffset += compressHUFFMAN(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1321 else
1322 compressedOffset += compressHUFFMAN(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), _columns[i].numElems(), _columns[i].sizeOfElems(), thisRoundNumRows);
1323 break;
1324 default:
1325 cout << "ERROR: Unkown compression sequence entry: " << sequence[i] << endl;
1326 break;
1327 }
1328 }
1329
1330 //check if compressed size is larger than uncompressed
1331 if (sequence[0] != FACT_RAW &&
1332 compressedOffset - previousOffset > _columns[i].sizeOfElems()*_columns[i].numElems()*thisRoundNumRows+sizeof(BlockHeader)+sizeof(uint16_t)*sequence.size())
1333 {//if so set flag and redo it uncompressed
1334 cout << "REDOING UNCOMPRESSED" << endl;
1335 compressedOffset = previousOffset + sizeof(BlockHeader) + 1;
1336 compressedOffset += compressUNCOMPRESSED(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1337 BlockHeader he;
1338 he.size = compressedOffset - previousOffset;
1339 he.numProcs = 1;
1340 he.ordering = FACT_ROW_MAJOR;
1341 memcpy(&(_compressedBuffer[threadIndex][previousOffset]), (char*)(&he), sizeof(BlockHeader));
1342 _compressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)] = FACT_RAW;
1343 offset += thisRoundNumRows*_columns[i].sizeOfElems()*_columns[i].numElems();
1344 _catalog[currentCatalogRow][i].first = compressedOffset - _catalog[currentCatalogRow][i].second;
1345 continue;
1346 }
1347 head.size = compressedOffset - previousOffset;
1348 memcpy(&(_compressedBuffer[threadIndex][previousOffset]), (char*)(&head), sizeof(BlockHeader));
1349 memcpy(&(_compressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)]), sequence.data(), sizeof(uint16_t)*sequence.size());
1350
1351 offset += thisRoundNumRows*_columns[i].sizeOfElems()*_columns[i].numElems();
1352 _catalog[currentCatalogRow][i].first = compressedOffset - _catalog[currentCatalogRow][i].second;
1353 }
1354
1355 TileHeader tHead(thisRoundNumRows, compressedOffset);
1356 memcpy(_compressedBuffer[threadIndex], &tHead, sizeof(TileHeader));
1357 return compressedOffset;
1358}
1359
1360/****************************************************************
1361 * WRITE COMPRESS DATA TO DISK
1362 ****************************************************************/
1363bool CompressedFitsWriter::writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite)
1364{
1365 char* checkSumPointer = _compressedBuffer[threadID];
1366 int32_t extraBytes = 0;
1367 uint32_t sizeToChecksum = sizeToWrite;
1368 if (_checkOffset != 0)
1369 {//should we extend the array to the left ?
1370 sizeToChecksum += _checkOffset;
1371 checkSumPointer -= _checkOffset;
1372 memset(checkSumPointer, 0, _checkOffset);
1373 }
1374 if (sizeToChecksum%4 != 0)
1375 {//should we extend the array to the right ?
1376 extraBytes = 4 - (sizeToChecksum%4);
1377 memset(checkSumPointer+sizeToChecksum, 0,extraBytes);
1378 sizeToChecksum += extraBytes;
1379 }
1380 //do the checksum
1381 _checksum.add(checkSumPointer, sizeToChecksum);
1382// cout << endl << "Checksum: " << _checksum.val() << endl;
1383 _checkOffset = (4 - extraBytes)%4;
1384 //write data to disk
1385 _file.write(_compressedBuffer[threadID], sizeToWrite);
1386 return _file.good();
1387}
1388
1389/****************************************************************
1390 * WRITER THREAD LOOP
1391 ****************************************************************/
1392void* CompressedFitsWriter::threadFunction(void* context)
1393{
1394 CompressedFitsWriter* myself =static_cast<CompressedFitsWriter*>(context);
1395
1396 uint32_t myID = 0;
1397 pthread_mutex_lock(&(myself->_mutex));
1398 myID = myself->_threadIndex++;
1399 pthread_mutex_unlock(&(myself->_mutex));
1400 uint32_t threadToWaitForBeforeWriting = (myID == 0) ? myself->_numThreads-1 : myID-1;
1401
1402 while (myself->_threadStatus[myID] != _THREAD_EXIT_)
1403 {
1404 while (myself->_threadStatus[myID] == _THREAD_WAIT_)
1405 usleep(100000);
1406 if (myself->_threadStatus[myID] != _THREAD_COMPRESS_)
1407 continue;
1408 uint32_t numBytes = myself->compressBuffer(myID);
1409 myself->_threadStatus[myID] = _THREAD_WRITE_;
1410
1411 //wait for the previous data to be written
1412 while (myself->_threadIndex != threadToWaitForBeforeWriting)
1413 usleep(1000);
1414 //do the actual writing to disk
1415 pthread_mutex_lock(&(myself->_mutex));
1416 myself->writeCompressedDataToDisk(myID, numBytes);
1417 myself->_threadIndex = myID;
1418 pthread_mutex_unlock(&(myself->_mutex));
1419 myself->_threadStatus[myID] = _THREAD_WAIT_;
1420 }
1421 return NULL;
1422}
1423
1424/****************************************************************
1425 * PRINT USAGE
1426 ****************************************************************/
1427void printUsage()
1428{
1429 cout << endl;
1430 cout << "The FACT-Fits compressor reads an input Fits file from FACT"
1431 " and compresses it.\n It can use a drs calibration in order to"
1432 " improve the compression ratio. If so, the input calibration"
1433 " is embedded into the compressed file.\n"
1434 " By default, the Data column will be compressed using SMOOTHMAN (Thomas' algorithm)"
1435 " while other columns will be compressed with the AMPLITUDE coding (Veritas)"
1436 "Usage: Compressed_Fits_Test <inputFile>";
1437 cout << endl;
1438}
1439
1440/****************************************************************
1441 * PRINT HELP
1442 ****************************************************************/
1443void printHelp()
1444{
1445 cout << endl;
1446 cout << "The inputFile is required. It must have fits in its filename and the compressed file will be written in the same folder. "
1447 "The fz extension will be added, replacing the .gz one if required \n"
1448 "If output is specified, then it will replace the automatically generated output filename\n"
1449 "If --drs, followed by a drs calib then it will be applied to the data before compressing\n"
1450 "rowPerTile can be used to tune how many rows are in each tile. Default is 100\n"
1451 "threads gives the number of threads to use. Cannot be less than the default (1)\n"
1452 "compression explicitely gives the compression scheme to use for a given column. The syntax is:\n"
1453 "<ColumnName>=<CompressionScheme> with <CompressionScheme> one of the following:\n"
1454 "UNCOMPRESSED\n"
1455 "AMPLITUDE\n"
1456 "HUFFMAN\n"
1457 "SMOOTHMAN\n"
1458 "INT_WAVELET\n"
1459 "\n"
1460 "--quiet removes any textual output, except error messages\n"
1461 "--verify makes the compressor check the compressed data. It will read it back, and compare the reconstructed CHECKSUM and DATASUM with the original file values."
1462 ;
1463 cout << endl << endl;
1464}
1465
1466/****************************************************************
1467 * SETUP CONFIGURATION
1468 ****************************************************************/
1469void setupConfiguration(Configuration& conf)
1470{
1471 po::options_description configs("FitsCompressor options");
1472 configs.add_options()
1473 ("inputFile,i", vars<string>(), "Input file")
1474 ("drs,d", var<string>(), "Input drs calibration file")
1475 ("rowPerTile,r", var<unsigned int>(), "Number of rows per tile. Default is 100")
1476 ("output,o", var<string>(), "Output file. If empty, .fz is appened to the original name")
1477 ("threads,t", var<unsigned int>(), "Number of threads to use for compression")
1478 ("compression,c", vars<string>(), "which compression to use for which column. Syntax <colName>=<compressionScheme>")
1479 ("quiet,q", po_switch(), "Should the program display any text at all ?")
1480 ("verify,v", po_switch(), "Should we verify the data that has been compressed ?")
1481 ;
1482 po::positional_options_description positional;
1483 positional.add("inputFile", -1);
1484 conf.AddOptions(configs);
1485 conf.SetArgumentPositions(positional);
1486}
1487
1488/****************************************************************
1489 * MAIN
1490 ****************************************************************/
1491int main(int argc, const char** argv)
1492{
1493 Configuration conf(argv[0]);
1494 conf.SetPrintUsage(printUsage);
1495 setupConfiguration(conf);
1496
1497 if (!conf.DoParse(argc, argv, printHelp))
1498 return -1;
1499
1500 //initialize the file names to nothing.
1501 string fileNameIn = "";
1502 string fileNameOut = "";
1503 string drsFileName = "";
1504 uint32_t numRowsPerTile = 100;
1505 bool displayText=true;
1506
1507 //parse configuration
1508 if (conf.Get<bool>("quiet")) displayText = false;
1509 const vector<string> inputFileNameVec = conf.Vec<string>("inputFile");
1510 if (inputFileNameVec.size() != 1)
1511 {
1512 cout << "Error: ";
1513 if (inputFileNameVec.size() == 0) cout << "no";
1514 else cout << inputFileNameVec.size();
1515 cout << " input file(s) given. Expected one. Aborting. Input:" << endl;;
1516 for (unsigned int i=0;i<inputFileNameVec.size(); i++)
1517 cout << inputFileNameVec[i] << endl;
1518 return -1;
1519 }
1520
1521 //Assign the input filename
1522 fileNameIn = inputFileNameVec[0];
1523
1524 //Check if we have a drs calib too
1525 if (conf.Has("drs")) drsFileName = conf.Get<string>("drs");
1526
1527 //Should we verify the data ?
1528 bool verifyDataPlease = false;
1529 if (conf.Has("verify")) verifyDataPlease = conf.Get<bool>("verify");
1530
1531
1532 //should we use a specific output filename ?
1533 if (conf.Has("output"))
1534 fileNameOut = conf.Get<string>("output");
1535 else
1536 {
1537 size_t pos = fileNameIn.find(".fits");
1538 if (pos == string::npos)
1539 {
1540 cout << "ERROR: input file does not seems ot be fits. Aborting." << endl;
1541 return -1;
1542 }
1543 fileNameOut = fileNameIn.substr(0, pos) + ".fz";
1544 }
1545
1546 //should we use specific compression on some columns ?
1547 const vector<string> columnsCompression = conf.Vec<string>("compression");
1548
1549 //split up values between column names and compression scheme
1550 vector<std::pair<string, string>> compressions;
1551 for (unsigned int i=0;i<columnsCompression.size();i++)
1552 {
1553 size_t pos = columnsCompression[i].find_first_of("=");
1554 if (pos == string::npos)
1555 {
1556 cout << "ERROR: Something wrong occured while parsing " << columnsCompression[i] << ". Aborting." << endl;
1557 return -1;
1558 }
1559 string comp = columnsCompression[i].substr(pos+1);
1560 if (comp != "UNCOMPRESSED" && comp != "AMPLITUDE" && comp != "HUFFMAN" &&
1561 comp != "SMOOTHMAN" && comp != "INT_WAVELET")
1562 {
1563 cout << "Unkown compression scheme requested (" << comp << "). Aborting." << endl;
1564 return -1;
1565 }
1566 compressions.push_back(make_pair(columnsCompression[i].substr(0, pos), comp));
1567 }
1568
1569 //How many rows per tile should we use ?
1570 if (conf.Has("rowPerTile")) numRowsPerTile = conf.Get<unsigned int>("rowPerTile");
1571
1572 /************************************************************************************
1573 * Done reading configuration. Open relevant files
1574 ************************************************************************************/
1575
1576 //Open input's fits file
1577 factfits inFile(fileNameIn);
1578
1579 if (inFile.IsCompressedFITS())
1580 {
1581 cout << "ERROR: input file is already a compressed fits. Cannot be compressed again: Aborting." << endl;
1582 return -1;
1583 }
1584
1585 //decide how many tiles should be put in the compressed file
1586 uint32_t originalNumRows = inFile.GetNumRows();
1587 uint32_t numTiles = (originalNumRows%numRowsPerTile) ? (originalNumRows/numRowsPerTile)+1 : originalNumRows/numRowsPerTile;
1588 CompressedFitsWriter outFile(numTiles, numRowsPerTile);
1589
1590 //should we use a specific number of threads for compressing ?
1591 unsigned int numThreads = 1;
1592 if (conf.Has("threads"))
1593 {
1594 numThreads = conf.Get<unsigned int>("threads");
1595 outFile.setNumWorkingThreads(numThreads);
1596 }
1597
1598 if (!outFile.open(fileNameOut))
1599 {
1600 cout << "Error: could not open " << fileNameOut << " for writing" << endl;
1601 return -1;
1602 }
1603
1604 //Because the file to open MUST be given by the constructor, I must use a pointer instead
1605 factfits* drsFile = NULL;
1606 //try to open the Drs file. If any.
1607 if (drsFileName != "")
1608 {
1609 try
1610 {
1611 drsFile = new factfits(drsFileName);
1612 }
1613 catch (...)
1614 {
1615 cout << "Error: could not open " << drsFileName << " for calibration" << endl;
1616 return -1;
1617 }
1618 }
1619
1620 if (displayText)
1621 {
1622 cout << endl;
1623 cout << "**********************" << endl;
1624 cout << "Will compress from : " << fileNameIn << endl;
1625 cout << "to : " << fileNameOut << endl;
1626 if (drsFileName != "")
1627 cout << "while calibrating with: " << drsFileName << endl;
1628 cout << "Compression will use : " << numThreads << " worker threads" << endl;
1629 cout << "Data will be verified : ";
1630 if (verifyDataPlease)
1631 cout << "yes" << endl;
1632 else
1633 cout << "no (WARNING !)" << endl;
1634 cout << "**********************" << endl;
1635 cout << endl;
1636 }
1637
1638 /************************************************************************************
1639 * Done opening input files. Allocate memory and configure output file
1640 ************************************************************************************/
1641
1642 //allocate the buffer for temporary storage of each read/written row
1643 uint32_t rowWidth = inFile.GetUInt("NAXIS1");
1644 char* buffer = new char[rowWidth];
1645
1646 //get the source columns
1647 const fits::Table::Columns& columns = inFile.GetColumns();
1648 const fits::Table::SortedColumns& sortedColumns = inFile.GetSortedColumns();
1649 if (displayText)
1650 cout << "Input file has " << columns.size() << " columns and " << inFile.GetNumRows() << " rows" << endl;
1651
1652 //Add columns.
1653 for (uint32_t i=0;i<sortedColumns.size(); i++)
1654 {
1655 //get column name
1656 ostringstream str;
1657 str << "TTYPE" << i+1;
1658 string colName = inFile.GetStr(str.str());
1659 if (displayText)
1660 {
1661 cout << "Column " << colName;
1662 for (uint32_t j=colName.size();j<21;j++)
1663 cout << " ";
1664 cout << " -> ";
1665 }
1666
1667 //get header structures
1668 BlockHeader rawHeader;
1669 BlockHeader smoothmanHeader(0, FACT_ROW_MAJOR, 2);
1670 vector<uint16_t> rawProcessings(1);
1671 rawProcessings[0] = FACT_RAW;
1672 vector<uint16_t> smoothmanProcessings(2);
1673 smoothmanProcessings[0] = FACT_SMOOTHING;
1674 smoothmanProcessings[1] = FACT_HUFFMAN16;
1675// smoothmanProcessings[2] = FACT_RAW;
1676
1677 //first lets see if we do have an explicit request
1678 bool explicitRequest = false;
1679 for (unsigned int j=0;j<compressions.size();j++)
1680 {
1681 if (compressions[j].first == colName)
1682 {
1683 explicitRequest = true;
1684 if (displayText) cout << compressions[j].second << endl;
1685 if (compressions[j].second == "UNCOMPRESSED")
1686 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, rawHeader, rawProcessings));
1687 if (compressions[j].second == "SMOOTHMAN")
1688 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, smoothmanHeader, smoothmanProcessings));
1689 break;
1690 }
1691 }
1692
1693 if (explicitRequest) continue;
1694
1695 if (colName != "Data")
1696 {
1697 if (displayText) cout << "UNCOMPRESSED" << endl;
1698 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, rawHeader, rawProcessings));
1699 }
1700 else
1701 {
1702 if (displayText) cout << "SMOOTHMAN" << endl;
1703 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, smoothmanHeader, smoothmanProcessings));
1704 }
1705 }
1706
1707 //translate original header entries to their Z-version
1708 const fits::Table::Keys& header = inFile.GetKeys();
1709 for (fits::Table::Keys::const_iterator i=header.begin(); i!= header.end(); i++)
1710 {
1711 string k = i->first;//header[i].key();
1712 if (k == "XTENSION" || k == "BITPIX" || k == "PCOUNT" || k == "GCOUNT" || k == "TFIELDS")
1713 continue;
1714 if (k == "CHECKSUM")
1715 {
1716 outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("ZCHKSUM", i->second.value, i->second.comment));
1717 continue;
1718 }
1719 if (k == "DATASUM")
1720 {
1721 outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("ZDTASUM", i->second.value, i->second.comment));
1722 continue;
1723 }
1724 k = k.substr(0,5);
1725 if (k == "TTYPE" || k == "TFORM")
1726 {
1727 string tmpKey = i->second.fitsString;
1728 tmpKey[0] = 'Z';
1729 outFile.setHeaderKey(tmpKey);
1730 continue;
1731 }
1732 if (k == "NAXIS")
1733 continue;
1734 outFile.setHeaderKey(i->second.fitsString);
1735// cout << i->first << endl;
1736 }
1737
1738 //deal with the DRS calib
1739 int16_t* drsCalib16 = NULL;
1740
1741 //load the drs calib. data
1742 int32_t startCellOffset = -1;
1743 if (drsFileName != "")
1744 {
1745 drsCalib16 = new int16_t[1440*1024];
1746 float* drsCalibFloat = NULL;
1747 try
1748 {
1749 drsCalibFloat = reinterpret_cast<float*>(drsFile->SetPtrAddress("BaselineMean"));
1750 }
1751 catch (...)
1752 {
1753 cout << "Could not find column BaselineMean in drs calibration file " << drsFileName << ". Aborting" << endl;
1754 return -1;
1755 }
1756
1757 //read the calibration and calculate its integer value
1758 drsFile->GetNextRow();
1759 for (uint32_t i=0;i<1440*1024;i++)
1760 drsCalib16[i] = (int16_t)(drsCalibFloat[i]*4096.f/2000.f);
1761
1762
1763 //get the start cells offsets
1764 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1765 if (it->first == "StartCellData")
1766 {
1767 startCellOffset = it->second.offset;
1768 if (it->second.type != 'I')
1769 {
1770 cout << "Wrong type for the StartCellData Column: " << it->second.type << " instead of I expected"<< endl;
1771 return -1;
1772 }
1773 }
1774
1775 if (startCellOffset == -1)
1776 {
1777 cout << "WARNING: Could not find StartCellData in input file " << fileNameIn << ". Doing it uncalibrated"<< endl;
1778 }
1779 else
1780 {
1781 //assign it to the ouput file
1782 outFile.setDrsCalib(drsCalib16);
1783 }
1784 }
1785
1786 /************************************************************************************
1787 * Done configuring compression. Do the real job now !
1788 ************************************************************************************/
1789
1790 if (displayText) cout << "Converting file..." << endl;
1791
1792 int numSlices = -1;
1793 int32_t dataOffset = -1;
1794
1795 //Get the pointer to the column that must be drs-calibrated
1796 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1797 if (it->first == "Data")
1798 {
1799 numSlices = it->second.num;
1800 dataOffset = it->second.offset;
1801 }
1802 if (numSlices % 1440 != 0)
1803 {
1804 cout << "seems like the number of samples is not a multiple of 1440. Aborting." << endl;
1805 return -1;
1806 }
1807 if (dataOffset == -1)
1808 {
1809 cout << "Could not find the column Data in the input file. Aborting." << endl;
1810 return -1;
1811 }
1812
1813 numSlices /= 1440;
1814
1815 //set pointers to the readout data to later be able to gather it to "buffer".
1816 vector<void*> readPointers;
1817 vector<int32_t> readOffsets;
1818 vector<int32_t> readElemSize;
1819 vector<int32_t> readNumElems;
1820 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1821 {
1822 readPointers.push_back(inFile.SetPtrAddress(it->first));
1823 readOffsets.push_back(it->second.offset);
1824 readElemSize.push_back(it->second.size);
1825 readNumElems.push_back(it->second.num);
1826 }
1827
1828 //Convert each row one after the other
1829 for (uint32_t i=0;i<inFile.GetNumRows();i++)
1830 {
1831 if (displayText) cout << "\r Row " << i+1 << flush;
1832 inFile.GetNextRow();
1833 //copy from inFile internal structures to buffer
1834 int32_t count=0;
1835 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end();it++)
1836 {
1837 memcpy(&buffer[readOffsets[count]], readPointers[count], readElemSize[count]*readNumElems[count]);
1838 count++;
1839 }
1840
1841 if (startCellOffset != -1)
1842 {//drs calibrate this data
1843 for (int j=0;j<1440;j++)
1844 {
1845 const int thisStartCell = reinterpret_cast<int16_t*>(&buffer[startCellOffset])[j];
1846 if (thisStartCell < 0) continue;
1847 for (int k=0;k<numSlices;k++)
1848 reinterpret_cast<int16_t*>(&buffer[dataOffset])[numSlices*j + k] -= drsCalib16[1024*j + (thisStartCell+k)%1024];
1849 }
1850 }
1851 outFile.writeBinaryRow(buffer);
1852 };
1853
1854 //Get table name for later use in case the compressed file is to be verified
1855 string tableName = inFile.GetStr("EXTNAME");
1856
1857 if (displayText) cout << endl << "Done. Flushing output file..." << endl;
1858
1859 inFile.close();
1860 if (!outFile.close())
1861 {
1862 cout << "Something went wrong while writing the catalog: negative index" << endl;
1863 return false;
1864 }
1865
1866 if (drsCalib16 != NULL)
1867 delete[] drsCalib16;
1868
1869 if (displayText) cout << "Done." << endl;
1870
1871 /************************************************************************************
1872 * Actual job done. Should we verify what we did ?
1873 ************************************************************************************/
1874
1875 if (verifyDataPlease)
1876 {
1877 if (displayText) cout << "Now verify data..." << endl;
1878 }
1879 else
1880 return 0;
1881
1882 //get a compressed reader
1883//TEMP try to copy the file too
1884// string copyName("/gpfs/scratch/fact/etienne/copyFile.fz");
1885// string copyName(fileNameOut+".copy");
1886 string copyName("");
1887 factfits verifyFile(fileNameOut, copyName, tableName, false);
1888
1889 //and the header of the compressed file
1890 const fits::Table::Keys& header2 = verifyFile.GetKeys();
1891
1892 //get a non-compressed writer
1893 ofits reconstructedFile;
1894
1895 //figure out its name: /dev/null unless otherwise specified
1896 string reconstructedName = fileNameOut+".recons";
1897 reconstructedName = "/dev/null";
1898 reconstructedFile.open(reconstructedName.c_str(), false);
1899
1900 //reconstruct the original columns from the compressed file.
1901 string origChecksumStr;
1902 string origDatasum;
1903
1904 //reset tablename value so that it is re-read from compressed table's header
1905 tableName = "";
1906
1907 /************************************************************************************
1908 * Reconstruction setup done. Rebuild original header
1909 ************************************************************************************/
1910
1911 //re-tranlate the keys
1912 for (fits::Table::Keys::const_iterator it=header2.begin(); it!= header2.end(); it++)
1913 {
1914 string k = it->first;
1915 if (k == "XTENSION" || k == "BITPIX" || k == "PCOUNT" || k == "GCOUNT" ||
1916 k == "TFIELDS" || k == "ZTABLE" || k == "ZNAXIS1" || k == "ZNAXIS2" ||
1917 k == "ZHEAPPTR" || k == "ZPCOUNT" || k == "ZTILELEN" || k == "THEAP" ||
1918 k == "CHECKSUM" || k == "DATASUM" || k == "FCTCPVER")
1919 {
1920 continue;
1921 }
1922
1923 if (k == "ZCHKSUM")
1924 {
1925 reconstructedFile.SetKeyComment("CHECKSUM", it->second.comment);
1926 origChecksumStr = it->second.value;
1927 continue;
1928 }
1929
1930 if (k == "ZDTASUM")
1931 {
1932 reconstructedFile.SetKeyComment("DATASUM", it->second.comment);
1933 origDatasum = it->second.value;
1934 continue;
1935 }
1936
1937 if (k == "EXTNAME")
1938 {
1939 tableName = it->second.value;
1940 }
1941
1942 k = k.substr(0,5);
1943
1944 if (k == "TTYPE")
1945 {//we have an original column name here.
1946 //manually deal with these in order to preserve the ordering (easier than re-constructing yet another list on the fly)
1947 continue;
1948 }
1949
1950 if (k == "TFORM" || k == "NAXIS" || k == "ZCTYP" )
1951 {
1952 continue;
1953 }
1954
1955 if (k == "ZFORM" || k == "ZTYPE")
1956 {
1957 string tmpKey = it->second.fitsString;
1958 tmpKey[0] = 'T';
1959 reconstructedFile.SetKeyFromFitsString(tmpKey);
1960 continue;
1961 }
1962
1963 reconstructedFile.SetKeyFromFitsString(it->second.fitsString);
1964 }
1965
1966 if (tableName == "")
1967 {
1968 cout << "Error: table name from file " << fileNameOut << " could not be found. Aborting" << endl;
1969 return -1;
1970 }
1971
1972 //Restore the original columns
1973 for (uint32_t numCol=1; numCol<10000; numCol++)
1974 {
1975 ostringstream str;
1976 str << numCol;
1977 if (!verifyFile.HasKey("TTYPE"+str.str())) break;
1978
1979 string ttype = verifyFile.GetStr("TTYPE"+str.str());
1980 string tform = verifyFile.GetStr("ZFORM"+str.str());
1981 char type = tform[tform.size()-1];
1982 string number = tform.substr(0, tform.size()-1);
1983 int numElems = atoi(number.c_str());
1984
1985 if (number == "") numElems=1;
1986
1987 reconstructedFile.AddColumn(numElems, type, ttype, "", "", false);
1988 }
1989
1990 reconstructedFile.WriteTableHeader(tableName.c_str());
1991
1992 /************************************************************************************
1993 * Original header restored. Do the data
1994 ************************************************************************************/
1995
1996 //set pointers to the readout data to later be able to gather it to "buffer".
1997 readPointers.clear();
1998 readOffsets.clear();
1999 readElemSize.clear();
2000 readNumElems.clear();
2001 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
2002 {
2003 readPointers.push_back(verifyFile.SetPtrAddress(it->first));
2004 readOffsets.push_back(it->second.offset);
2005 readElemSize.push_back(it->second.size);
2006 readNumElems.push_back(it->second.num);
2007 }
2008
2009 //do the actual reconstruction work
2010 uint32_t i=1;
2011 while (i<=verifyFile.GetNumRows() && verifyFile.GetNextRow())
2012 {
2013 int count=0;
2014 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end();it++)
2015 {
2016 memcpy(&buffer[readOffsets[count]], readPointers[count], readElemSize[count]*readNumElems[count]);
2017 count++;
2018 }
2019 if (displayText) cout << "\r Row " << i << flush;
2020 reconstructedFile.WriteRow(buffer, rowWidth);
2021 i++;
2022 }
2023
2024 if (displayText) cout << endl;
2025
2026 //close reconstruction input and output
2027// Do NOT close the verify file, otherwise data cannot be flushed to copy file
2028// verifyFile.close();
2029 if (!verifyFile.IsFileOk())
2030 cout << "ERROR: file checksum seems wrong" << endl;
2031
2032 reconstructedFile.close();
2033
2034 //get original and reconstructed checksum and datasum
2035 std::pair<string, int> origChecksum = make_pair(origChecksumStr, atoi(origDatasum.c_str()));
2036 std::pair<string, int> newChecksum = reconstructedFile.GetChecksumData();
2037
2038 //verify that no mistake was made
2039 if (origChecksum.second != newChecksum.second)
2040 {
2041 cout << "ERROR: datasums are NOT identical: " << (uint32_t)(origChecksum.second) << " vs " << (uint32_t)(newChecksum.second) << endl;
2042 return -1;
2043 }
2044 if (origChecksum.first != newChecksum.first)
2045 {
2046 cout << "WARNING: checksums are NOT Identical: " << origChecksum.first << " vs " << newChecksum.first << endl;
2047 }
2048 else
2049 {
2050 if (true) cout << "Ok" << endl;
2051 }
2052
2053 delete[] buffer;
2054 return 0;
2055
2056}
2057
Note: See TracBrowser for help on using the repository browser.