source: trunk/FACT++/src/fitsCompressor.cc@ 16869

Last change on this file since 16869 was 16848, checked in by lyard, 12 years ago
Fixed reconstructed file filename
File size: 85.0 KB
Line 
1/*
2 * fitsCompressor.cc
3 *
4 * Created on: May 7, 2013
5 * Author: lyard
6 */
7
8
9#include "Configuration.h"
10#include "../externals/factfits.h"
11#include "../externals/ofits.h"
12#include "../externals/checksum.h"
13
14#include <map>
15#include <fstream>
16#include <sstream>
17#include <iostream>
18
19
20using namespace std;
21
22typedef struct TileHeader
23{
24 char id[4];
25 uint32_t numRows;
26 uint64_t size;
27 TileHeader(uint32_t nRows=0,
28 uint64_t s=0) : id({'T', 'I', 'L', 'E'}),
29 numRows(nRows),
30 size(s)
31 { };
32} __attribute__((__packed__)) TileHeader;
33
34typedef struct BlockHeader
35{
36 uint64_t size;
37 char ordering;
38 unsigned char numProcs;
39 BlockHeader(uint64_t s=0,
40 char o=FACT_ROW_MAJOR,
41 unsigned char n=1) : size(s),
42 ordering(o),
43 numProcs(n)
44 {}
45} __attribute__((__packed__)) BlockHeader;
46
47class CompressedFitsFile
48{
49public:
50 class HeaderEntry
51 {
52 public:
53 /**
54 * Default constructor
55 */
56 HeaderEntry(): _key(""),
57 _value(""),
58 _comment(""),
59 _fitsString("")
60 {
61 }
62
63 /**
64 * Regular constructor.
65 * @param key the name of the keyword entry
66 * @param value its value
67 * @param comment an optionnal comment to be placed after the value
68 */
69 template<typename T>
70 HeaderEntry(const string& k,
71 const T& val,
72 const string& comm) : _key(k),
73 _value(""),
74 _comment(comm),
75 _fitsString("")
76 {
77 setValue(val);
78 }
79
80 /**
81 * From fits.h
82 */
83 string Trim(const string &str, char c=' ')
84 {
85 // Trim Both leading and trailing spaces
86 const size_t pstart = str.find_first_not_of(c); // Find the first character position after excluding leading blank spaces
87 const size_t pend = str.find_last_not_of(c); // Find the first character position from reverse af
88
89 // if all spaces or empty return an empty string
90 if (string::npos==pstart || string::npos==pend)
91 return string();
92
93 return str.substr(pstart, pend-pstart+1);
94 }
95 /**
96 * Constructor from the original fits entry
97 */
98 HeaderEntry(const string& line)
99 {
100 _fitsString = line.data();
101
102 //parse the line
103 _key = Trim(line.substr(0,8));
104 //COMMENT and/or HISTORY values
105 if (line.substr(8,2)!= "= ")
106 {
107 _value = "";
108 _comment = Trim(line.substr(10));
109 return;
110 }
111 string next=line.substr(10);
112 const size_t slash = next.find_first_of('/');
113 _value = Trim(Trim(Trim(next.substr(0, slash)), '\''));
114 _comment = Trim(next.substr(slash+1));
115 }
116 /**
117 * Alternative constroctor from the fits entry
118 */
119 HeaderEntry(const vector<char>& lineVec)
120 {
121 HeaderEntry(string(lineVec.data()));
122 }
123 /**
124 * Destructor
125 */
126 virtual ~HeaderEntry(){}
127
128 const string& value() const {return _value;}
129 const string& key() const {return _key;}
130 const string& comment() const {return _comment;}
131 const string& fitsString() const {return _fitsString;}
132
133 /**
134 * Set a keyword value.
135 * @param value The value to be set
136 * @param update whether the value already exist or not. To be modified soon.
137 */
138 template<typename T>
139 void setValue(const T& val)
140 {
141 ostringstream str;
142 str << val;
143 _value = str.str();
144 buildFitsString();
145 };
146 /**
147 * Set the comment for a given entry
148 */
149 void setComment(const string& comm)
150 {
151 _comment = comm;
152 buildFitsString();
153 }
154
155 private:
156 /**
157 * Construct the FITS header string from the key, value and comment
158 */
159 void buildFitsString()
160 {
161 ostringstream str;
162 unsigned int totSize = 0;
163
164 // Tuncate the key if required
165 if (_key.length() > 8)
166 {
167 str << _key.substr(0, 8);
168 totSize += 8;
169 }
170 else
171 {
172 str << _key;
173 totSize += _key.length();
174 }
175
176 // Append space if key is less than 8 chars long
177 for (int i=totSize; i<8;i++)
178 {
179 str << " ";
180 totSize++;
181 }
182
183 // Add separator
184 str << "= ";
185 totSize += 2;
186
187 // Format value
188 if (_value.length() < 20)
189 for (;totSize<30-_value.length();totSize++)
190 str << " ";
191
192 if (_value.length() > 70)
193 {
194 str << _value.substr(0,70);
195 totSize += 70;
196 }
197 else
198 {
199 str << _value;
200 totSize += _value.size();
201 }
202
203 // If there is space remaining, add comment area
204 if (totSize < 77)
205 {
206 str << " / ";
207 totSize += 3;
208 if (totSize < 80)
209 {
210 unsigned int commentSize = 80 - totSize;
211 if (_comment.length() > commentSize)
212 {
213 str << _comment.substr(0,commentSize);
214 totSize += commentSize;
215 }
216 else
217 {
218 str << _comment;
219 totSize += _comment.length();
220 }
221 }
222 }
223
224 // If there is yet some free space, fill up the entry with spaces
225 for (int i=totSize; i<80;i++)
226 str << " ";
227
228 _fitsString = str.str();
229
230 // Check for correct completion
231 if (_fitsString.length() != 80)
232 cout << "Error |" << _fitsString << "| is not of length 80" << endl;
233 }
234
235 string _key; ///< the key (name) of the header entry
236 string _value; ///< the value of the header entry
237 string _comment; ///< the comment associated to the header entry
238 string _fitsString; ///< the string that will be written to the fits file
239 };
240
241 /**
242 * Supported compressions
243 */
244 typedef enum
245 {
246 UNCOMPRESSED,
247 SMOOTHMAN
248 } FitsCompression;
249
250 /**
251 * Columns class
252 */
253 class ColumnEntry
254 {
255 public:
256 /**
257 * Default constructor
258 */
259 ColumnEntry();
260
261 /**
262 * Default destructor
263 */
264 virtual ~ColumnEntry(){}
265
266 /**
267 * Constructor from values
268 * @param n the column name
269 * @param t the column type
270 * @param numof the number of entries in the column
271 * @param comp the compression type for this column
272 */
273 ColumnEntry(const string& n,
274 char t,
275 int numOf,
276 BlockHeader& head,
277 vector<uint16_t>& seq) : _name(n),
278 _num(numOf),
279 _typeSize(0),
280 _offset(0),
281 _type(t),
282 _description(""),
283 _header(head),
284 _compSequence(seq)
285 {
286 switch (t)
287 {
288 case 'L':
289 case 'A':
290 case 'B': _typeSize = 1; break;
291 case 'I': _typeSize = 2; break;
292 case 'J':
293 case 'E': _typeSize = 4; break;
294 case 'K':
295 case 'D': _typeSize = 8; break;
296 default:
297 cout << "Error: typename " << t << " missing in the current implementation" << endl;
298 };
299
300 ostringstream str;
301 str << "data format of field: ";
302
303 switch (t)
304 {
305 case 'L': str << "1-byte BOOL"; break;
306 case 'A': str << "1-byte CHAR"; break;
307 case 'B': str << "BYTE"; break;
308 case 'I': str << "2-byte INTEGER"; break;
309 case 'J': str << "4-byte INTEGER"; break;
310 case 'K': str << "8-byte INTEGER"; break;
311 case 'E': str << "4-byte FLOAT"; break;
312 case 'D': str << "8-byte FLOAT"; break;
313 }
314
315 _description = str.str();
316 }
317
318 const string& name() const { return _name;};
319 int width() const { return _num*_typeSize;};
320 int offset() const { return _offset; };
321 int numElems() const { return _num; };
322 int sizeOfElems() const { return _typeSize;};
323 void setOffset(int off) { _offset = off;};
324 char type() const { return _type;};
325 string getDescription() const { return _description;}
326 BlockHeader& getBlockHeader() { return _header;}
327 const vector<uint16_t>& getCompressionSequence() const { return _compSequence;}
328 const char& getColumnOrdering() const { return _header.ordering;}
329
330
331 string getCompressionString() const
332 {
333 return "FACT";
334 /*
335 ostringstream str;
336 for (uint32_t i=0;i<_compSequence.size();i++)
337 switch (_compSequence[i])
338 {
339 case FACT_RAW: if (str.str().size() == 0) str << "RAW"; break;
340 case FACT_SMOOTHING: str << "SMOOTHING "; break;
341 case FACT_HUFFMAN16: str << "HUFFMAN16 "; break;
342 };
343 return str.str();*/
344 }
345
346 private:
347
348 string _name; ///< name of the column
349 int _num; ///< number of elements contained in one row of this column
350 int _typeSize; ///< the number of bytes taken by one element
351 int _offset; ///< the offset of the column, in bytes, from the beginning of one row
352 char _type; ///< the type of the column, as specified by the fits documentation
353 string _description; ///< a description for the column. It will be placed in the header
354 BlockHeader _header;
355 vector<uint16_t> _compSequence;
356 };
357
358 public:
359 ///@brief default constructor. Assigns a default number of rows and tiles
360 CompressedFitsFile(uint32_t numTiles=100, uint32_t numRowsPerTile=100);
361
362 ///@brief default destructor
363 virtual ~CompressedFitsFile();
364
365 ///@brief get the header of the file
366 vector<HeaderEntry>& getHeaderEntries() { return _header;}
367
368 protected:
369 ///@brief protected function to allocate the intermediate buffers
370 bool reallocateBuffers();
371
372 //FITS related stuff
373 vector<HeaderEntry> _header; ///< Header keys
374 vector<ColumnEntry> _columns; ///< Columns in the file
375 uint32_t _numTiles; ///< Number of tiles (i.e. groups of rows)
376 uint32_t _numRowsPerTile; ///< Number of rows per tile
377 uint32_t _totalNumRows; ///< Total number of raws
378 uint32_t _rowWidth; ///< Total number of bytes in one row
379 bool _headerFlushed; ///< Flag telling whether the header record is synchronized with the data on disk
380 char* _buffer; ///< Memory buffer to store rows while they are not compressed
381 Checksum _checksum; ///< Checksum for asserting the consistency of the data
382 fstream _file; ///< The actual file streamer for accessing disk data
383
384 //compression related stuff
385 typedef pair<int64_t, int64_t> CatalogEntry;
386 typedef vector<CatalogEntry> CatalogRow;
387 typedef vector<CatalogRow> CatalogType;
388 CatalogType _catalog; ///< Catalog, i.e. the main table that points to the compressed data.
389 uint64_t _heapPtr; ///< the address in the file of the heap area
390 vector<char*> _transposedBuffer; ///< Memory buffer to store rows while they are transposed
391 vector<char*> _compressedBuffer; ///< Memory buffer to store rows while they are compressed
392
393 //thread related stuff
394 uint32_t _numThreads; ///< The number of threads that will be used to compress
395 uint32_t _threadIndex; ///< A variable to assign threads indices
396 vector<pthread_t> _thread; ///< The thread handler of the compressor
397 vector<uint32_t> _threadNumRows; ///< Total number of rows for thread to compress
398 vector<uint32_t> _threadStatus; ///< Flag telling whether the buffer to be transposed (and compressed) is full or empty
399
400 //thread states. Not all used, but they do not hurt
401 static const uint32_t _THREAD_WAIT_; ///< Thread doing nothing
402 static const uint32_t _THREAD_COMPRESS_; ///< Thread working, compressing
403 static const uint32_t _THREAD_DECOMPRESS_; ///< Thread working, decompressing
404 static const uint32_t _THREAD_WRITE_; ///< Thread writing data to disk
405 static const uint32_t _THREAD_READ_; ///< Thread reading data from disk
406 static const uint32_t _THREAD_EXIT_; ///< Thread exiting
407
408 static HeaderEntry _dummyHeaderEntry; ///< Dummy entry for returning if requested on is not found
409};
410
411class CompressedFitsWriter : public CompressedFitsFile
412{
413 public:
414 ///@brief Default constructor. 100 tiles of 100 rows each are assigned by default
415 CompressedFitsWriter(uint32_t numTiles=100, uint32_t numRowsPerTile=100);
416
417 ///@brief default destructor
418 virtual ~CompressedFitsWriter();
419
420 ///@brief add one column to the file
421 bool addColumn(const ColumnEntry& column);
422
423 ///@brief sets a given header key
424 bool setHeaderKey(const HeaderEntry&);
425
426 ///@brief open a new fits file
427 bool open(const string& fileName, const string& tableName="Data");
428
429 ///@brief close the opened file
430 bool close();
431
432 ///@brief write one row of data, already placed in bufferToWrite. Does the byte-swapping
433 bool writeBinaryRow(const char* bufferToWrite);
434
435 ///@brief assign a given (already loaded) drs calibration
436 void setDrsCalib(int16_t* data);
437
438 ///@brief set the number of worker threads compressing the data.
439 bool setNumWorkingThreads(uint32_t num);
440
441 private:
442 ///@brief compresses one buffer of data, the one given by threadIndex
443 uint64_t compressBuffer(uint32_t threadIndex);
444
445 ///@brief writes an already compressed buffer to disk
446 bool writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite);
447
448 ///@brief add the header checksum to the datasum
449 void addHeaderChecksum(Checksum& checksum);
450
451 ///@brief write the header. If closingFile is set to true, checksum is calculated
452 void writeHeader(bool closingFile = false);
453
454 ///@brief write the compressed data catalog. If closingFile is set to true, checksum is calculated
455 void writeCatalog(bool closingFile=false);
456
457 /// FIXME this was a bad idea. Move everything to the regular header
458 vector<HeaderEntry> _defaultHeader;
459
460 /// the main function compressing the data
461 static void* threadFunction(void* context);
462
463 /// Write the drs calibration to disk, if any
464 void writeDrsCalib();
465
466 /// Copy and transpose (or semi-transpose) one tile of data
467 void copyTransposeTile(uint32_t index);
468
469 /// Specific compression functions
470 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
471 uint32_t compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
472 uint32_t compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
473 uint32_t applySMOOTHING(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
474
475 int32_t _checkOffset; ///< offset to the data pointer to calculate the checksum
476 int16_t* _drsCalibData; ///< array of the Drs baseline mean
477 int32_t _threadLooper; ///< Which thread will deal with the upcoming bunch of data ?
478 pthread_mutex_t _mutex; ///< mutex for compressing threads
479
480
481 static string _emptyBlock; ///< an empty block to be apened at the end of a file so that its length is a multiple of 2880
482 static string _fitsHeader; ///< the default header to be written in every fits file
483};
484
485const uint32_t CompressedFitsFile::_THREAD_WAIT_ = 0;
486const uint32_t CompressedFitsFile::_THREAD_COMPRESS_ = 1;
487const uint32_t CompressedFitsFile::_THREAD_DECOMPRESS_= 2;
488const uint32_t CompressedFitsFile::_THREAD_WRITE_ = 3;
489const uint32_t CompressedFitsFile::_THREAD_READ_ = 4;
490const uint32_t CompressedFitsFile::_THREAD_EXIT_ = 5;
491
492template<>
493void CompressedFitsFile::HeaderEntry::setValue(const string& v)
494{
495 string val = v;
496 if (val.size() > 2 && val[0] == '\'')
497 {
498 size_t pos = val.find_last_of("'");
499 if (pos != string::npos && pos != 0)
500 val = val.substr(1, pos-1);
501 }
502 ostringstream str;
503
504 str << "'" << val << "'";
505 for (int i=str.str().length(); i<20;i++)
506 str << " ";
507 _value = str.str();
508 buildFitsString();
509}
510
511/**
512 * Default header to be written in all fits files
513 */
514string CompressedFitsWriter::_fitsHeader = "SIMPLE = T / file does conform to FITS standard "
515 "BITPIX = 8 / number of bits per data pixel "
516 "NAXIS = 0 / number of data axes "
517 "EXTEND = T / FITS dataset may contain extensions "
518 "CHECKSUM= '4AcB48bA4AbA45bA' / Checksum for the whole HDU "
519 "DATASUM = ' 0' / Checksum for the data block "
520 "COMMENT FITS (Flexible Image Transport System) format is defined in 'Astronomy"
521 "COMMENT and Astrophysics', volume 376, page 359; bibcode: 2001A&A...376..359H "
522 "END "
523 " "
524 " "
525 " "
526 " "
527 " "
528 " "
529 " "
530 " "
531 " "
532 " "
533 " "
534 " "
535 " "
536 " "
537 " "
538 " "
539 " "
540 " "
541 " "
542 " "
543 " "
544 " "
545 " "
546 " "
547 " "
548 " "
549 " ";
550
551/**
552 * Empty block to be appenned at the end of files, so that the length matches multiple of 2880 bytes
553 *
554 */
555string CompressedFitsWriter::_emptyBlock = " "
556 " "
557 " "
558 " "
559 " "
560 " "
561 " "
562 " "
563 " "
564 " "
565 " "
566 " "
567 " "
568 " "
569 " "
570 " "
571 " "
572 " "
573 " "
574 " "
575 " "
576 " "
577 " "
578 " "
579 " "
580 " "
581 " "
582 " "
583 " "
584 " "
585 " "
586 " "
587 " "
588 " "
589 " "
590 " ";
591
592CompressedFitsFile::HeaderEntry CompressedFitsFile::_dummyHeaderEntry;
593/****************************************************************
594 * SUPER CLASS DEFAULT CONSTRUCTOR
595 ****************************************************************/
596CompressedFitsFile::CompressedFitsFile(uint32_t numTiles,
597 uint32_t numRowsPerTile) : _header(),
598 _columns(),
599 _numTiles(numTiles),
600 _numRowsPerTile(numRowsPerTile),
601 _totalNumRows(0),
602 _rowWidth(0),
603 _headerFlushed(false),
604 _buffer(NULL),
605 _checksum(0),
606 _heapPtr(0),
607 _transposedBuffer(1),
608 _compressedBuffer(1),
609 _numThreads(1),
610 _threadIndex(0),
611 _thread(1),
612 _threadNumRows(1),
613 _threadStatus(1)
614{
615 _catalog.resize(_numTiles);
616 _transposedBuffer[0] = NULL;
617 _compressedBuffer[0] = NULL;
618 _threadStatus[0] = _THREAD_WAIT_;
619 _threadNumRows[0] = 0;
620}
621
622/****************************************************************
623 * SUPER CLASS DEFAULT DESTRUCTOR
624 ****************************************************************/
625CompressedFitsFile::~CompressedFitsFile()
626{
627 if (_buffer != NULL)
628 {
629 delete[] _buffer;
630 _buffer = NULL;
631 for (uint32_t i=0;i<_numThreads;i++)
632 {
633 _compressedBuffer[i] = _compressedBuffer[i]-4;
634 delete[] _transposedBuffer[i];
635 delete[] _compressedBuffer[i];
636 _transposedBuffer[i] = NULL;
637 _compressedBuffer[i] = NULL;
638 }
639 }
640 if (_file.is_open())
641 _file.close();
642}
643
644/****************************************************************
645 * REALLOCATE BUFFER
646 ****************************************************************/
647bool CompressedFitsFile::reallocateBuffers()
648{
649 if (_buffer)
650 {
651 delete[] _buffer;
652 for (uint32_t i=0;i<_compressedBuffer.size();i++)
653 {
654 _compressedBuffer[i] = _compressedBuffer[i]-4;
655 delete[] _transposedBuffer[i];
656 delete[] _compressedBuffer[i];
657 }
658 }
659 _buffer = new char[_rowWidth*_numRowsPerTile];
660 if (_buffer == NULL) return false;
661 if (_compressedBuffer.size() != _numThreads)
662 {
663 _transposedBuffer.resize(_numThreads);
664 _compressedBuffer.resize(_numThreads);
665 }
666 for (uint32_t i=0;i<_numThreads;i++)
667 {
668 _transposedBuffer[i] = new char[_rowWidth*_numRowsPerTile];
669 _compressedBuffer[i] = new char[_rowWidth*_numRowsPerTile + _columns.size() + sizeof(TileHeader)]; //use a bit more memory for compression flags
670 if (_transposedBuffer[i] == NULL || _compressedBuffer[i] == NULL)
671 return false;
672 //shift the compressed buffer by 4 bytes, for checksum calculation
673 memset(_compressedBuffer[i], 0, 4);
674 _compressedBuffer[i] = _compressedBuffer[i]+4;
675 //initialize the tile header
676 TileHeader tileHeader;
677 memcpy(_compressedBuffer[i], &tileHeader, sizeof(TileHeader));
678 }
679 return true;
680}
681
682/****************************************************************
683 * DEFAULT WRITER CONSTRUCTOR
684 ****************************************************************/
685CompressedFitsWriter::CompressedFitsWriter(uint32_t numTiles,
686 uint32_t numRowsPerTile) : CompressedFitsFile(numTiles, numRowsPerTile),
687 _checkOffset(0),
688 _drsCalibData(NULL),
689 _threadLooper(0)
690{
691 _defaultHeader.push_back(HeaderEntry("XTENSION", "'BINTABLE' ", "binary table extension"));
692 _defaultHeader.push_back(HeaderEntry("BITPIX", 8, "8-bit bytes"));
693 _defaultHeader.push_back(HeaderEntry("NAXIS", 2, "2-dimensional binary table"));
694 _defaultHeader.push_back(HeaderEntry("NAXIS1", _rowWidth, "width of table in bytes"));
695 _defaultHeader.push_back(HeaderEntry("NAXIS2", numTiles, "num of rows in table"));
696 _defaultHeader.push_back(HeaderEntry("PCOUNT", 0, "size of special data area"));
697 _defaultHeader.push_back(HeaderEntry("GCOUNT", 1, "one data group (required keyword)"));
698 _defaultHeader.push_back(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
699 _defaultHeader.push_back(HeaderEntry("CHECKSUM", "'0000000000000000' ", "Checksum for the whole HDU"));
700 _defaultHeader.push_back(HeaderEntry("DATASUM", " 0", "Checksum for the data block"));
701 //compression stuff
702 _defaultHeader.push_back(HeaderEntry("ZTABLE", "T", "Table is compressed"));
703 _defaultHeader.push_back(HeaderEntry("ZNAXIS1", 0, "Width of uncompressed rows"));
704 _defaultHeader.push_back(HeaderEntry("ZNAXIS2", 0, "Number of uncompressed rows"));
705 _defaultHeader.push_back(HeaderEntry("ZPCOUNT", 0, ""));
706 _defaultHeader.push_back(HeaderEntry("ZHEAPPTR", 0, ""));
707 _defaultHeader.push_back(HeaderEntry("ZTILELEN", numRowsPerTile, "Number of rows per tile"));
708 _defaultHeader.push_back(HeaderEntry("THEAP", 0, ""));
709
710 pthread_mutex_init(&_mutex, NULL);
711}
712
713/****************************************************************
714 * DEFAULT DESTRUCTOR
715 ****************************************************************/
716CompressedFitsWriter::~CompressedFitsWriter()
717{
718 pthread_mutex_destroy(&_mutex);
719}
720
721/****************************************************************
722 * SET THE POINTER TO THE DRS CALIBRATION
723 ****************************************************************/
724void CompressedFitsWriter::setDrsCalib(int16_t* data)
725{
726 _drsCalibData = data;
727}
728
729/****************************************************************
730 * SET NUM WORKING THREADS
731 ****************************************************************/
732bool CompressedFitsWriter::setNumWorkingThreads(uint32_t num)
733{
734 if (_file.is_open())
735 return false;
736 if (num < 1 || num > 64)
737 {
738 cout << "ERROR: num threads must be between 1 and 64. Ignoring" << endl;
739 return false;
740 }
741 _numThreads = num;
742 _transposedBuffer[0] = NULL;
743 _compressedBuffer[0] = NULL;
744 _threadStatus.resize(num);
745 _thread.resize(num);
746 _threadNumRows.resize(num);
747 for (uint32_t i=0;i<num;i++)
748 {
749 _threadNumRows[i] = 0;
750 _threadStatus[i] = _THREAD_WAIT_;
751 }
752 return reallocateBuffers();
753}
754
755/****************************************************************
756 * WRITE DRS CALIBRATION TO FILE
757 ****************************************************************/
758void CompressedFitsWriter::writeDrsCalib()
759{
760 //if file was not loaded, ignore
761 if (_drsCalibData == NULL)
762 return;
763 uint64_t whereDidIStart = _file.tellp();
764 vector<HeaderEntry> header;
765 header.push_back(HeaderEntry("XTENSION", "'BINTABLE' ", "binary table extension"));
766 header.push_back(HeaderEntry("BITPIX" , 8 , "8-bit bytes"));
767 header.push_back(HeaderEntry("NAXIS" , 2 , "2-dimensional binary table"));
768 header.push_back(HeaderEntry("NAXIS1" , 1024*1440*2 , "width of table in bytes"));
769 header.push_back(HeaderEntry("NAXIS2" , 1 , "number of rows in table"));
770 header.push_back(HeaderEntry("PCOUNT" , 0 , "size of special data area"));
771 header.push_back(HeaderEntry("GCOUNT" , 1 , "one data group (required keyword)"));
772 header.push_back(HeaderEntry("TFIELDS" , 1 , "number of fields in each row"));
773 header.push_back(HeaderEntry("CHECKSUM", "'0000000000000000' ", "Checksum for the whole HDU"));
774 header.push_back(HeaderEntry("DATASUM" , " 0" , "Checksum for the data block"));
775 header.push_back(HeaderEntry("EXTNAME" , "'ZDrsCellOffsets' ", "name of this binary table extension"));
776 header.push_back(HeaderEntry("TTYPE1" , "'OffsetCalibration' ", "label for field 1"));
777 header.push_back(HeaderEntry("TFORM1" , "'1474560I' ", "data format of field: 2-byte INTEGER"));
778
779 for (uint32_t i=0;i<header.size();i++)
780 _file.write(header[i].fitsString().c_str(), 80);
781 //End the header
782 _file.write("END ", 80);
783 long here = _file.tellp();
784 if (here%2880)
785 _file.write(_emptyBlock.c_str(), 2880 - here%2880);
786 //now write the data itself
787 int16_t* swappedBytes = new int16_t[1024];
788 Checksum checksum;
789 for (int32_t i=0;i<1440;i++)
790 {
791 memcpy(swappedBytes, &(_drsCalibData[i*1024]), 2048);
792 for (int32_t j=0;j<2048;j+=2)
793 {
794 int8_t inter;
795 inter = reinterpret_cast<int8_t*>(swappedBytes)[j];
796 reinterpret_cast<int8_t*>(swappedBytes)[j] = reinterpret_cast<int8_t*>(swappedBytes)[j+1];
797 reinterpret_cast<int8_t*>(swappedBytes)[j+1] = inter;
798 }
799 _file.write(reinterpret_cast<char*>(swappedBytes), 2048);
800 checksum.add(reinterpret_cast<char*>(swappedBytes), 2048);
801 }
802 uint64_t whereDidIStop = _file.tellp();
803 delete[] swappedBytes;
804 //No need to pad the data, as (1440*1024*2)%2880==0
805
806 //calculate the checksum from the header
807 ostringstream str;
808 str << checksum.val();
809 header[9] = HeaderEntry("DATASUM", str.str(), "Checksum for the data block");
810 for (vector<HeaderEntry>::iterator it=header.begin();it!=header.end(); it++)
811 checksum.add(it->fitsString().c_str(), 80);
812 string end("END ");
813 string space(" ");
814 checksum.add(end.c_str(), 80);
815 int headerRowsLeft = 36 - (header.size() + 1)%36;
816 for (int i=0;i<headerRowsLeft;i++)
817 checksum.add(space.c_str(), 80);
818 //udpate the checksum keyword
819 header[8] = HeaderEntry("CHECKSUM", checksum.str(), "Checksum for the whole HDU");
820 //and eventually re-write the header data
821 _file.seekp(whereDidIStart);
822 for (uint32_t i=0;i<header.size();i++)
823 _file.write(header[i].fitsString().c_str(), 80);
824 _file.seekp(whereDidIStop);
825}
826
827/****************************************************************
828 * ADD COLUMN
829 ****************************************************************/
830bool CompressedFitsWriter::addColumn(const ColumnEntry& column)
831{
832 if (_totalNumRows != 0)
833 {
834 cout << "Error: cannot add new columns once first row has been written" << endl;
835 return false;
836 }
837 for (vector<ColumnEntry>::iterator it=_columns.begin(); it != _columns.end(); it++)
838 {
839 if (it->name() == column.name())
840 {
841 cout << "Warning: column already exist (" << column.name() << "). Ignoring" << endl;
842 return false;
843 }
844 }
845 _columns.push_back(column);
846 _columns.back().setOffset(_rowWidth);
847 _rowWidth += column.width();
848 reallocateBuffers();
849
850 ostringstream str, str2, str3;
851 str << "TTYPE" << _columns.size();
852 str2 << column.name();
853 str3 << "label for field ";
854 if (_columns.size() < 10) str3 << " ";
855 if (_columns.size() < 100) str3 << " ";
856 str3 << _columns.size();
857 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
858 str.str("");
859 str2.str("");
860 str3.str("");
861 str << "TFORM" << _columns.size();
862 str2 << "1QB";
863 str3 << "data format of field " << _columns.size();
864 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
865 str.str("");
866 str2.str("");
867 str3.str("");
868 str << "ZFORM" << _columns.size();
869 str2 << column.numElems() << column.type();
870 str3 << "Original format of field " << _columns.size();
871 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
872 str.str("");
873 str2.str("");
874 str3.str("");
875 str << "ZCTYP" << _columns.size();
876 str2 << column.getCompressionString();
877 str3 << "Comp. Scheme of field " << _columns.size();
878 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
879 //resize the catalog vector accordingly
880 for (uint32_t i=0;i<_numTiles;i++)
881 {
882 _catalog[i].resize(_columns.size());
883 for (uint32_t j=0;j<_catalog[i].size();j++)
884 _catalog[i][j] = make_pair(0,0);
885 }
886 return true;
887}
888
889/****************************************************************
890 * SET HEADER KEY
891 ****************************************************************/
892bool CompressedFitsWriter::setHeaderKey(const HeaderEntry& entry)
893{
894 HeaderEntry ent = entry;
895 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
896 {
897 if (it->key() == entry.key())
898 {
899 if (entry.comment() == "")
900 ent.setComment(it->comment());
901 (*it) = ent;
902 _headerFlushed = false;
903 return true;
904 }
905 }
906 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
907 {
908 if (it->key() == entry.key())
909 {
910 if (entry.comment() == "")
911 ent.setComment(it->comment());
912 (*it) = ent;
913 _headerFlushed = false;
914 return true;
915 }
916 }
917 if (_totalNumRows != 0)
918 {
919 cout << "Error: new header keys (" << entry.key() << ") must be set before the first row is written. Ignoring." << endl;
920 return false;
921 }
922 _header.push_back(entry);
923 _headerFlushed = false;
924 return true;
925}
926
927/****************************************************************
928 * OPEN
929 ****************************************************************/
930bool CompressedFitsWriter::open(const string& fileName, const string& tableName)
931{
932 _file.open(fileName.c_str(), ios_base::out);
933 if (!_file.is_open())
934 {
935 cout << "Error: Could not open the file (" << fileName << ")." << endl;
936 return false;
937 }
938 _defaultHeader.push_back(HeaderEntry("EXTNAME", tableName, "name of this binary table extension"));
939 _headerFlushed = false;
940 _threadIndex = 0;
941 //create requested number of threads
942 for (uint32_t i=0;i<_numThreads;i++)
943 pthread_create(&(_thread[i]), NULL, threadFunction, this);
944 //wait for the threads to start
945 while (_numThreads != _threadIndex)
946 usleep(1000);
947 //set the writing fence to the last thread
948 _threadIndex = _numThreads-1;
949 return (_file.good());
950}
951
952/****************************************************************
953 * WRITE HEADER
954 ****************************************************************/
955void CompressedFitsWriter::writeHeader(bool closingFile)
956{
957 if (_headerFlushed)
958 return;
959 if (!_file.is_open())
960 return;
961
962 long cPos = _file.tellp();
963
964 _file.seekp(0);
965
966 _file.write(_fitsHeader.c_str(), 2880);
967
968 //Write the DRS calib table here !
969 writeDrsCalib();
970
971 //we are now at the beginning of the main table. Write its header
972 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
973 _file.write(it->fitsString().c_str(), 80);
974
975 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
976 _file.write(it->fitsString().c_str(), 80);
977
978 _file.write("END ", 80);
979 long here = _file.tellp();
980 if (here%2880)
981 _file.write(_emptyBlock.c_str(), 2880 - here%2880);
982
983 _headerFlushed = true;
984
985 here = _file.tellp();
986
987 if (here%2880)
988 cout << "Error: seems that header did not finish at the end of a block." << endl;
989
990 if (here > cPos && cPos != 0)
991 {
992 cout << "Error, entries were added after the first row was written. This is not supposed to happen." << endl;
993 return;
994 }
995
996 here = _file.tellp();
997 writeCatalog(closingFile);
998
999 here = _file.tellp() - here;
1000 _heapPtr = here;
1001
1002 if (cPos != 0)
1003 _file.seekp(cPos);
1004}
1005
1006/****************************************************************
1007 * WRITE CATALOG
1008 * WARNING: writeCatalog is only meant to be used by writeHeader.
1009 * external usage will most likely corrupt the file
1010 ****************************************************************/
1011void CompressedFitsWriter::writeCatalog(bool closingFile)
1012{
1013 uint32_t sizeWritten = 0;
1014 for (uint32_t i=0;i<_catalog.size();i++)
1015 {
1016 for (uint32_t j=0;j<_catalog[i].size();j++)
1017 {
1018 //swap the bytes
1019 int8_t swappedEntry[16];
1020 swappedEntry[0] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[7];
1021 swappedEntry[1] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[6];
1022 swappedEntry[2] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[5];
1023 swappedEntry[3] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[4];
1024 swappedEntry[4] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[3];
1025 swappedEntry[5] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[2];
1026 swappedEntry[6] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[1];
1027 swappedEntry[7] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[0];
1028
1029 swappedEntry[8] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[7];
1030 swappedEntry[9] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[6];
1031 swappedEntry[10] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[5];
1032 swappedEntry[11] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[4];
1033 swappedEntry[12] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[3];
1034 swappedEntry[13] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[2];
1035 swappedEntry[14] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[1];
1036 swappedEntry[15] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[0];
1037 if (closingFile)
1038 {
1039 _checksum.add(reinterpret_cast<char*>(swappedEntry), 16);
1040 }
1041 _file.write(reinterpret_cast<char*>(&swappedEntry[0]), 2*sizeof(int64_t));
1042 sizeWritten += 2*sizeof(int64_t);
1043 }
1044 }
1045
1046 //we do not reserve space for now because fverify does not like that.
1047 //TODO bug should be fixed in the new version. Install it on the cluster and restor space reservation
1048 return ;
1049
1050 //write the padding so that the HEAP section starts at a 2880 bytes boundary
1051 if (sizeWritten % 2880 != 0)
1052 {
1053 vector<char> nullVec(2880 - sizeWritten%2880, 0);
1054 _file.write(nullVec.data(), 2880 - sizeWritten%2880);
1055 }
1056}
1057
1058/****************************************************************
1059 * ADD HEADER CHECKSUM
1060 ****************************************************************/
1061void CompressedFitsWriter::addHeaderChecksum(Checksum& checksum)
1062{
1063 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin();it!=_defaultHeader.end(); it++)
1064 _checksum.add(it->fitsString().c_str(), 80);
1065 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
1066 _checksum.add(it->fitsString().c_str(), 80);
1067 string end("END ");
1068 string space(" ");
1069 checksum.add(end.c_str(), 80);
1070 int headerRowsLeft = 36 - (_defaultHeader.size() + _header.size() + 1)%36;
1071 for (int i=0;i<headerRowsLeft;i++)
1072 checksum.add(space.c_str(), 80);
1073}
1074
1075/****************************************************************
1076 * CLOSE
1077 ****************************************************************/
1078bool CompressedFitsWriter::close()
1079{
1080 for (uint32_t i=0;i<_numThreads;i++)
1081 while (_threadStatus[i] != _THREAD_WAIT_)
1082 usleep(100000);
1083 for (uint32_t i=0;i<_numThreads;i++)
1084 _threadStatus[i] = _THREAD_EXIT_;
1085 for (uint32_t i=0;i<_numThreads;i++)
1086 pthread_join(_thread[i], NULL);
1087 //flush the rows that were not written yet
1088 if (_totalNumRows%_numRowsPerTile != 0)
1089 {
1090 copyTransposeTile(0);
1091
1092 _threadNumRows[0] = _totalNumRows;
1093 uint32_t numBytes = compressBuffer(0);
1094 writeCompressedDataToDisk(0, numBytes);
1095 }
1096 //compression stuff
1097 setHeaderKey(HeaderEntry("ZNAXIS1", _rowWidth, "Width of uncompressed rows"));
1098 setHeaderKey(HeaderEntry("ZNAXIS2", _totalNumRows, "Number of uncompressed rows"));
1099 //TODO calculate the real offset from the main table to the start of the HEAP data area
1100 setHeaderKey(HeaderEntry("ZHEAPPTR", _heapPtr, ""));
1101 setHeaderKey(HeaderEntry("THEAP", _heapPtr, ""));
1102
1103 //regular stuff
1104 if (_catalog.size() > 0)
1105 {
1106 setHeaderKey(HeaderEntry("NAXIS1", 2*sizeof(int64_t)*_catalog[0].size(), "width of table in bytes"));
1107 setHeaderKey(HeaderEntry("NAXIS2", _numTiles, ""));
1108 setHeaderKey(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
1109 int64_t heapSize = 0;
1110 int64_t compressedOffset = 0;
1111 for (uint32_t i=0;i<_catalog.size();i++)
1112 {
1113 compressedOffset += sizeof(TileHeader);
1114 heapSize += sizeof(TileHeader);
1115 for (uint32_t j=0;j<_catalog[i].size();j++)
1116 {
1117 heapSize += _catalog[i][j].first;
1118// cout << "heapSize: " << heapSize << endl;
1119 //set the catalog offsets to their actual values
1120 _catalog[i][j].second = compressedOffset;
1121 compressedOffset += _catalog[i][j].first;
1122 //special case if entry has zero length
1123 if (_catalog[i][j].first == 0) _catalog[i][j].second = 0;
1124 }
1125 }
1126 setHeaderKey(HeaderEntry("PCOUNT", heapSize, "size of special data area"));
1127 }
1128 else
1129 {
1130 setHeaderKey(HeaderEntry("NAXIS1", _rowWidth, "width of table in bytes"));
1131 setHeaderKey(HeaderEntry("NAXIS2", 0, ""));
1132 setHeaderKey(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
1133 setHeaderKey(HeaderEntry("PCOUNT", 0, "size of special data area"));
1134 }
1135 writeHeader(true);
1136
1137 ostringstream str;
1138 str << _checksum.val();
1139
1140 setHeaderKey(HeaderEntry("DATASUM", str.str(), ""));
1141 addHeaderChecksum(_checksum);
1142 setHeaderKey(HeaderEntry("CHECKSUM", _checksum.str(), ""));
1143 //update header value
1144 writeHeader();
1145 //update file length
1146 long here = _file.tellp();
1147 if (here%2880)
1148 {
1149 vector<char> nullVec(2880 - here%2880, 0);
1150 _file.write(nullVec.data(), 2880 - here%2880);
1151 }
1152 _file.close();
1153 return true;
1154}
1155
1156/****************************************************************
1157 * COPY TRANSPOSE TILE
1158 ****************************************************************/
1159void CompressedFitsWriter::copyTransposeTile(uint32_t index)
1160{
1161 uint32_t thisRoundNumRows = (_totalNumRows%_numRowsPerTile) ? _totalNumRows%_numRowsPerTile : _numRowsPerTile;
1162 //copy the tile and transpose it
1163 uint32_t offset = 0;
1164 for (uint32_t i=0;i<_columns.size();i++)
1165 {
1166 switch (_columns[i].getColumnOrdering())//getCompression())
1167 {
1168 case FACT_ROW_MAJOR:
1169 for (uint32_t k=0;k<thisRoundNumRows;k++)
1170 {//regular, "semi-transposed" copy
1171 memcpy(&(_transposedBuffer[index][offset]), &_buffer[k*_rowWidth + _columns[i].offset()], _columns[i].sizeOfElems()*_columns[i].numElems());
1172 offset += _columns[i].sizeOfElems()*_columns[i].numElems();
1173 }
1174 break;
1175
1176 case FACT_COL_MAJOR :
1177 for (int j=0;j<_columns[i].numElems();j++)
1178 for (uint32_t k=0;k<thisRoundNumRows;k++)
1179 {//transposed copy
1180 memcpy(&(_transposedBuffer[index][offset]), &_buffer[k*_rowWidth + _columns[i].offset() + _columns[i].sizeOfElems()*j], _columns[i].sizeOfElems());
1181 offset += _columns[i].sizeOfElems();
1182 }
1183 break;
1184 default:
1185 cout << "Error: unknown column ordering: " << _columns[i].getColumnOrdering() << endl;
1186
1187 };
1188 }
1189}
1190
1191/****************************************************************
1192 * WRITE BINARY ROW
1193 ****************************************************************/
1194bool CompressedFitsWriter::writeBinaryRow(const char* bufferToWrite)
1195{
1196 if (_totalNumRows == 0)
1197 writeHeader();
1198
1199 memcpy(&_buffer[_rowWidth*(_totalNumRows%_numRowsPerTile)], bufferToWrite, _rowWidth);
1200 _totalNumRows++;
1201 if (_totalNumRows%_numRowsPerTile == 0)
1202 {
1203 //which is the next thread that we should use ?
1204 while (_threadStatus[_threadLooper] == _THREAD_COMPRESS_)
1205 usleep(100000);
1206
1207 copyTransposeTile(_threadLooper);
1208
1209 while (_threadStatus[_threadLooper] != _THREAD_WAIT_)
1210 usleep(100000);
1211
1212 _threadNumRows[_threadLooper] = _totalNumRows;
1213 _threadStatus[_threadLooper] = _THREAD_COMPRESS_;
1214 _threadLooper = (_threadLooper+1)%_numThreads;
1215 }
1216 return _file.good();
1217}
1218
1219/****************************************************************
1220 * COMPRESS BUFFER
1221 ****************************************************************/
1222uint32_t CompressedFitsWriter::compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1223{
1224 memcpy(dest, src, numRows*sizeOfElems*numRowElems);
1225 return numRows*sizeOfElems*numRowElems;
1226}
1227
1228/****************************************************************
1229 * COMPRESS BUFFER
1230 ****************************************************************/
1231uint32_t CompressedFitsWriter::compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1232{
1233 string huffmanOutput;
1234 uint32_t previousHuffmanSize = 0;
1235 if (numRows < 2)
1236 {//if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
1237 return numRows*sizeOfElems*numRowElems + 1000;
1238 }
1239 if (sizeOfElems < 2 )
1240 {
1241 cout << "Fatal ERROR: HUFMANN can only encode short or longer types" << endl;
1242 return 0;
1243 }
1244 uint32_t huffmanOffset = 0;
1245 for (uint32_t j=0;j<numRowElems;j++)
1246 {
1247 Huffman::Encode(huffmanOutput,
1248 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
1249 numRows*(sizeOfElems/2));
1250 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
1251 huffmanOffset += sizeof(uint32_t);
1252 previousHuffmanSize = huffmanOutput.size();
1253 }
1254 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
1255
1256 //only copy if not larger than not-compressed size
1257 if (totalSize < numRows*sizeOfElems*numRowElems)
1258 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
1259
1260 return totalSize;
1261}
1262
1263/****************************************************************
1264 * COMPRESS BUFFER
1265 ****************************************************************/
1266uint32_t CompressedFitsWriter::compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1267{
1268 uint32_t colWidth = numRowElems;
1269 for (int j=colWidth*numRows-1;j>1;j--)
1270 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
1271 //call the huffman transposed
1272 return compressHUFFMAN(dest, src, numRowElems, sizeOfElems, numRows);
1273}
1274
1275uint32_t CompressedFitsWriter::applySMOOTHING(char* , char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1276{
1277 uint32_t colWidth = numRowElems;
1278 for (int j=colWidth*numRows-1;j>1;j--)
1279 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
1280
1281 return numRows*sizeOfElems*numRowElems;
1282}
1283/****************************************************************
1284 * COMPRESS BUFFER
1285 ****************************************************************/
1286uint64_t CompressedFitsWriter::compressBuffer(uint32_t threadIndex)
1287{
1288 uint32_t thisRoundNumRows = (_threadNumRows[threadIndex]%_numRowsPerTile) ? _threadNumRows[threadIndex]%_numRowsPerTile : _numRowsPerTile;
1289 uint32_t offset=0;
1290 uint32_t currentCatalogRow = (_threadNumRows[threadIndex]-1)/_numRowsPerTile;
1291 uint64_t compressedOffset = sizeof(TileHeader); //skip the 'TILE' marker and tile size entry
1292
1293 //now compress each column one by one by calling compression on arrays
1294 for (uint32_t i=0;i<_columns.size();i++)
1295 {
1296 _catalog[currentCatalogRow][i].second = compressedOffset;
1297
1298 if (_columns[i].numElems() == 0) continue;
1299
1300 BlockHeader& head = _columns[i].getBlockHeader();
1301 const vector<uint16_t>& sequence = _columns[i].getCompressionSequence();
1302 //set the default byte telling if uncompressed the compressed Flag
1303 uint64_t previousOffset = compressedOffset;
1304 //skip header data
1305 compressedOffset += sizeof(BlockHeader) + sizeof(uint16_t)*sequence.size();
1306
1307 for (uint32_t j=0;j<sequence.size(); j++)
1308 {
1309 switch (sequence[j])
1310 {
1311 case FACT_RAW:
1312// if (head.numProcs == 1)
1313 compressedOffset += compressUNCOMPRESSED(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1314 break;
1315 case FACT_SMOOTHING:
1316 applySMOOTHING(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1317 break;
1318 case FACT_HUFFMAN16:
1319 if (head.ordering == FACT_COL_MAJOR)
1320 compressedOffset += compressHUFFMAN(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1321 else
1322 compressedOffset += compressHUFFMAN(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), _columns[i].numElems(), _columns[i].sizeOfElems(), thisRoundNumRows);
1323 break;
1324 default:
1325 cout << "ERROR: Unkown compression sequence entry: " << sequence[i] << endl;
1326 break;
1327 }
1328 }
1329
1330 //check if compressed size is larger than uncompressed
1331 if (sequence[0] != FACT_RAW &&
1332 compressedOffset - previousOffset > _columns[i].sizeOfElems()*_columns[i].numElems()*thisRoundNumRows+sizeof(BlockHeader)+sizeof(uint16_t)*sequence.size())
1333 {//if so set flag and redo it uncompressed
1334 cout << "REDOING UNCOMPRESSED" << endl;
1335 compressedOffset = previousOffset + sizeof(BlockHeader) + 1;
1336 compressedOffset += compressUNCOMPRESSED(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1337 BlockHeader he;
1338 he.size = compressedOffset - previousOffset;
1339 he.numProcs = 1;
1340 he.ordering = FACT_ROW_MAJOR;
1341 memcpy(&(_compressedBuffer[threadIndex][previousOffset]), (char*)(&he), sizeof(BlockHeader));
1342 _compressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)] = FACT_RAW;
1343 offset += thisRoundNumRows*_columns[i].sizeOfElems()*_columns[i].numElems();
1344 _catalog[currentCatalogRow][i].first = compressedOffset - _catalog[currentCatalogRow][i].second;
1345 continue;
1346 }
1347 head.size = compressedOffset - previousOffset;
1348 memcpy(&(_compressedBuffer[threadIndex][previousOffset]), (char*)(&head), sizeof(BlockHeader));
1349 memcpy(&(_compressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)]), sequence.data(), sizeof(uint16_t)*sequence.size());
1350
1351 offset += thisRoundNumRows*_columns[i].sizeOfElems()*_columns[i].numElems();
1352 _catalog[currentCatalogRow][i].first = compressedOffset - _catalog[currentCatalogRow][i].second;
1353 }
1354
1355 TileHeader tHead(thisRoundNumRows, compressedOffset);
1356 memcpy(_compressedBuffer[threadIndex], &tHead, sizeof(TileHeader));
1357 return compressedOffset;
1358}
1359
1360/****************************************************************
1361 * WRITE COMPRESS DATA TO DISK
1362 ****************************************************************/
1363bool CompressedFitsWriter::writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite)
1364{
1365 char* checkSumPointer = _compressedBuffer[threadID];
1366 int32_t extraBytes = 0;
1367 uint32_t sizeToChecksum = sizeToWrite;
1368 if (_checkOffset != 0)
1369 {//should we extend the array to the left ?
1370 sizeToChecksum += _checkOffset;
1371 checkSumPointer -= _checkOffset;
1372 memset(checkSumPointer, 0, _checkOffset);
1373 }
1374 if (sizeToChecksum%4 != 0)
1375 {//should we extend the array to the right ?
1376 extraBytes = 4 - (sizeToChecksum%4);
1377 memset(checkSumPointer+sizeToChecksum, 0,extraBytes);
1378 sizeToChecksum += extraBytes;
1379 }
1380 //do the checksum
1381 _checksum.add(checkSumPointer, sizeToChecksum);
1382 _checkOffset = (4 - extraBytes)%4;
1383 //write data to disk
1384 _file.write(_compressedBuffer[threadID], sizeToWrite);
1385 return _file.good();
1386}
1387
1388/****************************************************************
1389 * WRITER THREAD LOOP
1390 ****************************************************************/
1391void* CompressedFitsWriter::threadFunction(void* context)
1392{
1393 CompressedFitsWriter* myself =static_cast<CompressedFitsWriter*>(context);
1394
1395 uint32_t myID = 0;
1396 pthread_mutex_lock(&(myself->_mutex));
1397 myID = myself->_threadIndex++;
1398 pthread_mutex_unlock(&(myself->_mutex));
1399 uint32_t threadToWaitForBeforeWriting = (myID == 0) ? myself->_numThreads-1 : myID-1;
1400
1401 while (myself->_threadStatus[myID] != _THREAD_EXIT_)
1402 {
1403 while (myself->_threadStatus[myID] == _THREAD_WAIT_)
1404 usleep(100000);
1405 if (myself->_threadStatus[myID] != _THREAD_COMPRESS_)
1406 continue;
1407 uint32_t numBytes = myself->compressBuffer(myID);
1408 myself->_threadStatus[myID] = _THREAD_WRITE_;
1409
1410 //wait for the previous data to be written
1411 while (myself->_threadIndex != threadToWaitForBeforeWriting)
1412 usleep(1000);
1413 //do the actual writing to disk
1414 pthread_mutex_lock(&(myself->_mutex));
1415 myself->writeCompressedDataToDisk(myID, numBytes);
1416 myself->_threadIndex = myID;
1417 pthread_mutex_unlock(&(myself->_mutex));
1418 myself->_threadStatus[myID] = _THREAD_WAIT_;
1419 }
1420 return NULL;
1421}
1422
1423/****************************************************************
1424 * PRINT USAGE
1425 ****************************************************************/
1426void printUsage()
1427{
1428 cout << endl;
1429 cout << "The FACT-Fits compressor reads an input Fits file from FACT"
1430 " and compresses it.\n It can use a drs calibration in order to"
1431 " improve the compression ratio. If so, the input calibration"
1432 " is embedded into the compressed file.\n"
1433 " By default, the Data column will be compressed using SMOOTHMAN (Thomas' algorithm)"
1434 " while other columns will be compressed with the AMPLITUDE coding (Veritas)"
1435 "Usage: Compressed_Fits_Test <inputFile>";
1436 cout << endl;
1437}
1438
1439/****************************************************************
1440 * PRINT HELP
1441 ****************************************************************/
1442void printHelp()
1443{
1444 cout << endl;
1445 cout << "The inputFile is required. It must have fits in its filename and the compressed file will be written in the same folder. "
1446 "The fz extension will be added, replacing the .gz one if required \n"
1447 "If output is specified, then it will replace the automatically generated output filename\n"
1448 "If --drs, followed by a drs calib then it will be applied to the data before compressing\n"
1449 "rowPerTile can be used to tune how many rows are in each tile. Default is 100\n"
1450 "threads gives the number of threads to use. Cannot be less than the default (1)\n"
1451 "compression explicitely gives the compression scheme to use for a given column. The syntax is:\n"
1452 "<ColumnName>=<CompressionScheme> with <CompressionScheme> one of the following:\n"
1453 "UNCOMPRESSED\n"
1454 "AMPLITUDE\n"
1455 "HUFFMAN\n"
1456 "SMOOTHMAN\n"
1457 "INT_WAVELET\n"
1458 "\n"
1459 "--quiet removes any textual output, except error messages\n"
1460 "--verify makes the compressor check the compressed data. It will read it back, and compare the reconstructed CHECKSUM and DATASUM with the original file values."
1461 ;
1462 cout << endl << endl;
1463}
1464
1465/****************************************************************
1466 * SETUP CONFIGURATION
1467 ****************************************************************/
1468void setupConfiguration(Configuration& conf)
1469{
1470 po::options_description configs("FitsCompressor options");
1471 configs.add_options()
1472 ("inputFile,i", vars<string>(), "Input file")
1473 ("drs,d", var<string>(), "Input drs calibration file")
1474 ("rowPerTile,r", var<unsigned int>(), "Number of rows per tile. Default is 100")
1475 ("output,o", var<string>(), "Output file. If empty, .fz is appened to the original name")
1476 ("threads,t", var<unsigned int>(), "Number of threads to use for compression")
1477 ("compression,c", vars<string>(), "which compression to use for which column. Syntax <colName>=<compressionScheme>")
1478 ("quiet,q", po_switch(), "Should the program display any text at all ?")
1479 ("verify,v", po_switch(), "Should we verify the data that has been compressed ?")
1480 ;
1481 po::positional_options_description positional;
1482 positional.add("inputFile", -1);
1483 conf.AddOptions(configs);
1484 conf.SetArgumentPositions(positional);
1485}
1486
1487/****************************************************************
1488 * MAIN
1489 ****************************************************************/
1490int main(int argc, const char** argv)
1491{
1492 Configuration conf(argv[0]);
1493 conf.SetPrintUsage(printUsage);
1494 setupConfiguration(conf);
1495
1496 if (!conf.DoParse(argc, argv, printHelp))
1497 return -1;
1498
1499 //initialize the file names to nothing.
1500 string fileNameIn = "";
1501 string fileNameOut = "";
1502 string drsFileName = "";
1503 uint32_t numRowsPerTile = 100;
1504 bool displayText=true;
1505
1506 //parse configuration
1507 if (conf.Get<bool>("quiet")) displayText = false;
1508 const vector<string> inputFileNameVec = conf.Vec<string>("inputFile");
1509 if (inputFileNameVec.size() != 1)
1510 {
1511 cout << "Error: ";
1512 if (inputFileNameVec.size() == 0) cout << "no";
1513 else cout << inputFileNameVec.size();
1514 cout << " input file(s) given. Expected one. Aborting. Input:" << endl;;
1515 for (unsigned int i=0;i<inputFileNameVec.size(); i++)
1516 cout << inputFileNameVec[i] << endl;
1517 return -1;
1518 }
1519
1520 //Assign the input filename
1521 fileNameIn = inputFileNameVec[0];
1522
1523 //Check if we have a drs calib too
1524 if (conf.Has("drs")) drsFileName = conf.Get<string>("drs");
1525
1526 //Should we verify the data ?
1527 bool verifyDataPlease = false;
1528 if (conf.Has("verify")) verifyDataPlease = conf.Get<bool>("verify");
1529
1530
1531 //should we use a specific output filename ?
1532 if (conf.Has("output"))
1533 fileNameOut = conf.Get<string>("output");
1534 else
1535 {
1536 size_t pos = fileNameIn.find(".fits");
1537 if (pos == string::npos)
1538 {
1539 cout << "ERROR: input file does not seems ot be fits. Aborting." << endl;
1540 return -1;
1541 }
1542 fileNameOut = fileNameIn.substr(0, pos) + ".fz";
1543 }
1544
1545 //should we use specific compression on some columns ?
1546 const vector<string> columnsCompression = conf.Vec<string>("compression");
1547
1548 //split up values between column names and compression scheme
1549 vector<std::pair<string, string>> compressions;
1550 for (unsigned int i=0;i<columnsCompression.size();i++)
1551 {
1552 size_t pos = columnsCompression[i].find_first_of("=");
1553 if (pos == string::npos)
1554 {
1555 cout << "ERROR: Something wrong occured while parsing " << columnsCompression[i] << ". Aborting." << endl;
1556 return -1;
1557 }
1558 string comp = columnsCompression[i].substr(pos+1);
1559 if (comp != "UNCOMPRESSED" && comp != "AMPLITUDE" && comp != "HUFFMAN" &&
1560 comp != "SMOOTHMAN" && comp != "INT_WAVELET")
1561 {
1562 cout << "Unkown compression scheme requested (" << comp << "). Aborting." << endl;
1563 return -1;
1564 }
1565 compressions.push_back(make_pair(columnsCompression[i].substr(0, pos), comp));
1566 }
1567
1568 //How many rows per tile should we use ?
1569 if (conf.Has("rowPerTile")) numRowsPerTile = conf.Get<unsigned int>("rowPerTile");
1570
1571 /************************************************************************************
1572 * Done reading configuration. Open relevant files
1573 ************************************************************************************/
1574
1575 //Open input's fits file
1576 factfits inFile(fileNameIn);
1577
1578 if (inFile.IsCompressedFITS())
1579 {
1580 cout << "ERROR: input file is already a compressed fits. Cannot be compressed again: Aborting." << endl;
1581 return -1;
1582 }
1583
1584 //decide how many tiles should be put in the compressed file
1585 uint32_t originalNumRows = inFile.GetNumRows();
1586 uint32_t numTiles = (originalNumRows%numRowsPerTile) ? (originalNumRows/numRowsPerTile)+1 : originalNumRows/numRowsPerTile;
1587 CompressedFitsWriter outFile(numTiles, numRowsPerTile);
1588
1589 //should we use a specific number of threads for compressing ?
1590 unsigned int numThreads = 1;
1591 if (conf.Has("threads"))
1592 {
1593 numThreads = conf.Get<unsigned int>("threads");
1594 outFile.setNumWorkingThreads(numThreads);
1595 }
1596
1597 if (!outFile.open(fileNameOut))
1598 {
1599 cout << "Error: could not open " << fileNameOut << " for writing" << endl;
1600 return -1;
1601 }
1602
1603 //Because the file to open MUST be given by the constructor, I must use a pointer instead
1604 factfits* drsFile = NULL;
1605 //try to open the Drs file. If any.
1606 if (drsFileName != "")
1607 {
1608 try
1609 {
1610 drsFile = new factfits(drsFileName);
1611 }
1612 catch (...)
1613 {
1614 cout << "Error: could not open " << drsFileName << " for calibration" << endl;
1615 return -1;
1616 }
1617 }
1618
1619 if (displayText)
1620 {
1621 cout << endl;
1622 cout << "**********************" << endl;
1623 cout << "Will compress from : " << fileNameIn << endl;
1624 cout << "to : " << fileNameOut << endl;
1625 if (drsFileName != "")
1626 cout << "while calibrating with: " << drsFileName << endl;
1627 cout << "Compression will use : " << numThreads << " worker threads" << endl;
1628 cout << "Data will be verified : ";
1629 if (verifyDataPlease)
1630 cout << "yes" << endl;
1631 else
1632 cout << "no (WARNING !)" << endl;
1633 cout << "**********************" << endl;
1634 cout << endl;
1635 }
1636
1637 /************************************************************************************
1638 * Done opening input files. Allocate memory and configure output file
1639 ************************************************************************************/
1640
1641 //allocate the buffer for temporary storage of each read/written row
1642 uint32_t rowWidth = inFile.GetUInt("NAXIS1");
1643 char* buffer = new char[rowWidth];
1644
1645 //get the source columns
1646 const fits::Table::Columns& columns = inFile.GetColumns();
1647 const fits::Table::SortedColumns& sortedColumns = inFile.GetSortedColumns();
1648 if (displayText)
1649 cout << "Input file has " << columns.size() << " columns and " << inFile.GetNumRows() << " rows" << endl;
1650
1651 //Add columns.
1652 for (uint32_t i=0;i<sortedColumns.size(); i++)
1653 {
1654 //get column name
1655 ostringstream str;
1656 str << "TTYPE" << i+1;
1657 string colName = inFile.GetStr(str.str());
1658 if (displayText)
1659 {
1660 cout << "Column " << colName;
1661 for (uint32_t j=colName.size();j<21;j++)
1662 cout << " ";
1663 cout << " -> ";
1664 }
1665
1666 //get header structures
1667 BlockHeader rawHeader;
1668 BlockHeader smoothmanHeader(0, FACT_ROW_MAJOR, 2);
1669 vector<uint16_t> rawProcessings(1);
1670 rawProcessings[0] = FACT_RAW;
1671 vector<uint16_t> smoothmanProcessings(2);
1672 smoothmanProcessings[0] = FACT_SMOOTHING;
1673 smoothmanProcessings[1] = FACT_HUFFMAN16;
1674// smoothmanProcessings[2] = FACT_RAW;
1675
1676 //first lets see if we do have an explicit request
1677 bool explicitRequest = false;
1678 for (unsigned int j=0;j<compressions.size();j++)
1679 {
1680 if (compressions[j].first == colName)
1681 {
1682 explicitRequest = true;
1683 if (displayText) cout << compressions[j].second << endl;
1684 if (compressions[j].second == "UNCOMPRESSED")
1685 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, rawHeader, rawProcessings));
1686 if (compressions[j].second == "SMOOTHMAN")
1687 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, smoothmanHeader, smoothmanProcessings));
1688 break;
1689 }
1690 }
1691
1692 if (explicitRequest) continue;
1693
1694 if (colName != "Data")
1695 {
1696 if (displayText) cout << "UNCOMPRESSED" << endl;
1697 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, rawHeader, rawProcessings));
1698 }
1699 else
1700 {
1701 if (displayText) cout << "SMOOTHMAN" << endl;
1702 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, smoothmanHeader, smoothmanProcessings));
1703 }
1704 }
1705
1706 //translate original header entries to their Z-version
1707 const fits::Table::Keys& header = inFile.GetKeys();
1708 for (fits::Table::Keys::const_iterator i=header.begin(); i!= header.end(); i++)
1709 {
1710 string k = i->first;//header[i].key();
1711 if (k == "XTENSION" || k == "BITPIX" || k == "PCOUNT" || k == "GCOUNT" || k == "TFIELDS")
1712 continue;
1713 if (k == "CHECKSUM")
1714 {
1715 outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("ZCHKSUM", i->second.value, i->second.comment));
1716 continue;
1717 }
1718 if (k == "DATASUM")
1719 {
1720 outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("ZDTASUM", i->second.value, i->second.comment));
1721 continue;
1722 }
1723 k = k.substr(0,5);
1724 if (k == "TTYPE" || k == "TFORM")
1725 {
1726 string tmpKey = i->second.fitsString;
1727 tmpKey[0] = 'Z';
1728 outFile.setHeaderKey(tmpKey);
1729 continue;
1730 }
1731 if (k == "NAXIS")
1732 continue;
1733 outFile.setHeaderKey(i->second.fitsString);
1734// cout << i->first << endl;
1735 }
1736
1737 //deal with the DRS calib
1738 int16_t* drsCalib16 = NULL;
1739
1740 //load the drs calib. data
1741 int32_t startCellOffset = -1;
1742 if (drsFileName != "")
1743 {
1744 drsCalib16 = new int16_t[1440*1024];
1745 float* drsCalibFloat = NULL;
1746 try
1747 {
1748 drsCalibFloat = reinterpret_cast<float*>(drsFile->SetPtrAddress("BaselineMean"));
1749 }
1750 catch (...)
1751 {
1752 cout << "Could not find column BaselineMean in drs calibration file " << drsFileName << ". Aborting" << endl;
1753 return -1;
1754 }
1755
1756 //read the calibration and calculate its integer value
1757 drsFile->GetNextRow();
1758 for (uint32_t i=0;i<1440*1024;i++)
1759 drsCalib16[i] = (int16_t)(drsCalibFloat[i]*4096.f/2000.f);
1760
1761
1762 //get the start cells offsets
1763 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1764 if (it->first == "StartCellData")
1765 {
1766 startCellOffset = it->second.offset;
1767 if (it->second.type != 'I')
1768 {
1769 cout << "Wrong type for the StartCellData Column: " << it->second.type << " instead of I expected"<< endl;
1770 return -1;
1771 }
1772 }
1773
1774 if (startCellOffset == -1)
1775 {
1776 cout << "WARNING: Could not find StartCellData in input file " << fileNameIn << ". Doing it uncalibrated"<< endl;
1777 }
1778 else
1779 {
1780 //assign it to the ouput file
1781 outFile.setDrsCalib(drsCalib16);
1782 }
1783 }
1784
1785 /************************************************************************************
1786 * Done configuring compression. Do the real job now !
1787 ************************************************************************************/
1788
1789 if (displayText) cout << "Converting file..." << endl;
1790
1791 int numSlices = -1;
1792 int32_t dataOffset = -1;
1793
1794 //Get the pointer to the column that must be drs-calibrated
1795 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1796 if (it->first == "Data")
1797 {
1798 numSlices = it->second.num;
1799 dataOffset = it->second.offset;
1800 }
1801 if (numSlices % 1440 != 0)
1802 {
1803 cout << "seems like the number of samples is not a multiple of 1440. Aborting." << endl;
1804 return -1;
1805 }
1806 if (dataOffset == -1)
1807 {
1808 cout << "Could not find the column Data in the input file. Aborting." << endl;
1809 return -1;
1810 }
1811
1812 numSlices /= 1440;
1813
1814 //set pointers to the readout data to later be able to gather it to "buffer".
1815 vector<void*> readPointers;
1816 vector<int32_t> readOffsets;
1817 vector<int32_t> readElemSize;
1818 vector<int32_t> readNumElems;
1819 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1820 {
1821 readPointers.push_back(inFile.SetPtrAddress(it->first));
1822 readOffsets.push_back(it->second.offset);
1823 readElemSize.push_back(it->second.size);
1824 readNumElems.push_back(it->second.num);
1825 }
1826
1827 //Convert each row one after the other
1828 for (uint32_t i=0;i<inFile.GetNumRows();i++)
1829 {
1830 if (displayText) cout << "\r Row " << i+1 << flush;
1831 inFile.GetNextRow();
1832 //copy from inFile internal structures to buffer
1833 int32_t count=0;
1834 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end();it++)
1835 {
1836 memcpy(&buffer[readOffsets[count]], readPointers[count], readElemSize[count]*readNumElems[count]);
1837 count++;
1838 }
1839
1840 if (startCellOffset != -1)
1841 {//drs calibrate this data
1842 for (int j=0;j<1440;j++)
1843 {
1844 const int thisStartCell = reinterpret_cast<int16_t*>(&buffer[startCellOffset])[j];
1845 if (thisStartCell < 0) continue;
1846 for (int k=0;k<numSlices;k++)
1847 reinterpret_cast<int16_t*>(&buffer[dataOffset])[numSlices*j + k] -= drsCalib16[1024*j + (thisStartCell+k)%1024];
1848 }
1849 }
1850 outFile.writeBinaryRow(buffer);
1851 };
1852
1853 //Get table name for later use in case the compressed file is to be verified
1854 string tableName = inFile.GetStr("EXTNAME");
1855
1856 if (displayText) cout << endl << "Done. Flushing output file..." << endl;
1857
1858 inFile.close();
1859 if (!outFile.close())
1860 {
1861 cout << "Something went wrong while writing the catalog: negative index" << endl;
1862 return false;
1863 }
1864
1865 if (drsCalib16 != NULL)
1866 delete[] drsCalib16;
1867
1868 if (displayText) cout << "Done." << endl;
1869
1870 /************************************************************************************
1871 * Actual job done. Should we verify what we did ?
1872 ************************************************************************************/
1873
1874 if (verifyDataPlease)
1875 {
1876 if (displayText) cout << "Now verify data..." << endl;
1877 }
1878 else
1879 return 0;
1880
1881 //get a compressed reader
1882//TEMP try to copy the file too
1883// string copyName("/gpfs/scratch/fact/etienne/copyFile.fz");
1884// string copyName(fileNameOut+".copy");
1885 string copyName("");
1886 factfits verifyFile(fileNameOut, copyName, tableName, false);
1887
1888 //and the header of the compressed file
1889 const fits::Table::Keys& header2 = verifyFile.GetKeys();
1890
1891 //get a non-compressed writer
1892 ofits reconstructedFile;
1893
1894 //figure out its name: /dev/null unless otherwise specified
1895 string reconstructedName = fileNameOut+".recons";
1896// reconstructedName = "/dev/null";
1897 reconstructedFile.open(reconstructedName.c_str(), false);
1898
1899 //reconstruct the original columns from the compressed file.
1900 string origChecksumStr;
1901 string origDatasum;
1902
1903 //reset tablename value so that it is re-read from compressed table's header
1904 tableName = "";
1905
1906 /************************************************************************************
1907 * Reconstruction setup done. Rebuild original header
1908 ************************************************************************************/
1909
1910 //re-tranlate the keys
1911 for (fits::Table::Keys::const_iterator it=header2.begin(); it!= header2.end(); it++)
1912 {
1913 string k = it->first;
1914 if (k == "XTENSION" || k == "BITPIX" || k == "PCOUNT" || k == "GCOUNT" ||
1915 k == "TFIELDS" || k == "ZTABLE" || k == "ZNAXIS1" || k == "ZNAXIS2" ||
1916 k == "ZHEAPPTR" || k == "ZPCOUNT" || k == "ZTILELEN" || k == "THEAP" ||
1917 k == "CHECKSUM" || k == "DATASUM" || k == "FCTCPVER")
1918 {
1919 continue;
1920 }
1921
1922 if (k == "ZCHKSUM")
1923 {
1924 reconstructedFile.SetKeyComment("CHECKSUM", it->second.comment);
1925 origChecksumStr = it->second.value;
1926 continue;
1927 }
1928
1929 if (k == "ZDTASUM")
1930 {
1931 reconstructedFile.SetKeyComment("DATASUM", it->second.comment);
1932 origDatasum = it->second.value;
1933 continue;
1934 }
1935
1936 if (k == "EXTNAME")
1937 {
1938 tableName = it->second.value;
1939 }
1940
1941 k = k.substr(0,5);
1942
1943 if (k == "TTYPE")
1944 {//we have an original column name here.
1945 //manually deal with these in order to preserve the ordering (easier than re-constructing yet another list on the fly)
1946 continue;
1947 }
1948
1949 if (k == "TFORM" || k == "NAXIS" || k == "ZCTYP" )
1950 {
1951 continue;
1952 }
1953
1954 if (k == "ZFORM" || k == "ZTYPE")
1955 {
1956 string tmpKey = it->second.fitsString;
1957 tmpKey[0] = 'T';
1958 reconstructedFile.SetKeyFromFitsString(tmpKey);
1959 continue;
1960 }
1961
1962 reconstructedFile.SetKeyFromFitsString(it->second.fitsString);
1963 }
1964
1965 if (tableName == "")
1966 {
1967 cout << "Error: table name from file " << fileNameOut << " could not be found. Aborting" << endl;
1968 return -1;
1969 }
1970
1971 //Restore the original columns
1972 for (uint32_t numCol=1; numCol<10000; numCol++)
1973 {
1974 ostringstream str;
1975 str << numCol;
1976 if (!verifyFile.HasKey("TTYPE"+str.str())) break;
1977
1978 string ttype = verifyFile.GetStr("TTYPE"+str.str());
1979 string tform = verifyFile.GetStr("ZFORM"+str.str());
1980 char type = tform[tform.size()-1];
1981 string number = tform.substr(0, tform.size()-1);
1982 int numElems = atoi(number.c_str());
1983
1984 if (number == "") numElems=1;
1985
1986 reconstructedFile.AddColumn(numElems, type, ttype, "", "", false);
1987 }
1988
1989 reconstructedFile.WriteTableHeader(tableName.c_str());
1990
1991 /************************************************************************************
1992 * Original header restored. Do the data
1993 ************************************************************************************/
1994
1995 //set pointers to the readout data to later be able to gather it to "buffer".
1996 readPointers.clear();
1997 readOffsets.clear();
1998 readElemSize.clear();
1999 readNumElems.clear();
2000 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
2001 {
2002 readPointers.push_back(verifyFile.SetPtrAddress(it->first));
2003 readOffsets.push_back(it->second.offset);
2004 readElemSize.push_back(it->second.size);
2005 readNumElems.push_back(it->second.num);
2006 }
2007
2008 //do the actual reconstruction work
2009 uint32_t i=1;
2010 while (i<=verifyFile.GetNumRows() && verifyFile.GetNextRow())
2011 {
2012 int count=0;
2013 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end();it++)
2014 {
2015 memcpy(&buffer[readOffsets[count]], readPointers[count], readElemSize[count]*readNumElems[count]);
2016 count++;
2017 }
2018 if (displayText) cout << "\r Row " << i << flush;
2019 reconstructedFile.WriteRow(buffer, rowWidth);
2020 i++;
2021 }
2022
2023 if (displayText) cout << endl;
2024
2025 //close reconstruction input and output
2026// Do NOT close the verify file, otherwise data cannot be flushed to copy file
2027// verifyFile.close();
2028 reconstructedFile.close();
2029
2030 //get original and reconstructed checksum and datasum
2031 std::pair<string, int> origChecksum = make_pair(origChecksumStr, atoi(origDatasum.c_str()));
2032 std::pair<string, int> newChecksum = reconstructedFile.GetChecksumData();
2033
2034 //verify that no mistake was made
2035 if (origChecksum.second != newChecksum.second)
2036 {
2037 cout << "ERROR: datasums are NOT identical: " << (uint32_t)(origChecksum.second) << " vs " << (uint32_t)(newChecksum.second) << endl;
2038 return -1;
2039 }
2040 if (origChecksum.first != newChecksum.first)
2041 {
2042 cout << "WARNING: checksums are NOT Identical: " << origChecksum.first << " vs " << newChecksum.first << endl;
2043 }
2044 else
2045 {
2046 if (true) cout << "Ok" << endl;
2047 }
2048
2049 delete[] buffer;
2050 return 0;
2051
2052}
2053
Note: See TracBrowser for help on using the repository browser.