source: trunk/FACT++/src/fitsCompressor.cc@ 16991

Last change on this file since 16991 was 16944, checked in by lyard, 11 years ago
Fixed FITS conformance issue for .fz files when numrows=0
File size: 87.9 KB
Line 
1/*
2 * fitsCompressor.cc
3 *
4 * Created on: May 7, 2013
5 * Author: lyard
6 */
7
8
9#include "Configuration.h"
10#include "../externals/factfits.h"
11#include "../externals/ofits.h"
12#include "../externals/checksum.h"
13
14#include <map>
15#include <fstream>
16#include <sstream>
17#include <iostream>
18
19
20using namespace std;
21
22typedef struct TileHeader
23{
24 char id[4];
25 uint32_t numRows;
26 uint64_t size;
27 TileHeader(uint32_t nRows=0,
28 uint64_t s=0) : id({'T', 'I', 'L', 'E'}),
29 numRows(nRows),
30 size(s)
31 { };
32} __attribute__((__packed__)) TileHeader;
33
34typedef struct BlockHeader
35{
36 uint64_t size;
37 char ordering;
38 unsigned char numProcs;
39 BlockHeader(uint64_t s=0,
40 char o=zfits::kOrderByRow,
41 unsigned char n=1) : size(s),
42 ordering(o),
43 numProcs(n)
44 {}
45} __attribute__((__packed__)) BlockHeader;
46
47class CompressedFitsFile
48{
49public:
50 class HeaderEntry
51 {
52 public:
53 /**
54 * Default constructor
55 */
56 HeaderEntry(): _key(""),
57 _value(""),
58 _comment(""),
59 _fitsString("")
60 {
61 }
62
63 /**
64 * Regular constructor.
65 * @param key the name of the keyword entry
66 * @param value its value
67 * @param comment an optionnal comment to be placed after the value
68 */
69 template<typename T>
70 HeaderEntry(const string& k,
71 const T& val,
72 const string& comm) : _key(k),
73 _value(""),
74 _comment(comm),
75 _fitsString("")
76 {
77 setValue(val);
78 }
79
80 /**
81 * From fits.h
82 */
83 string Trim(const string &str, char c=' ')
84 {
85 // Trim Both leading and trailing spaces
86 const size_t pstart = str.find_first_not_of(c); // Find the first character position after excluding leading blank spaces
87 const size_t pend = str.find_last_not_of(c); // Find the first character position from reverse af
88
89 // if all spaces or empty return an empty string
90 if (string::npos==pstart || string::npos==pend)
91 return string();
92
93 return str.substr(pstart, pend-pstart+1);
94 }
95 /**
96 * Constructor from the original fits entry
97 */
98 HeaderEntry(const string& line)
99 {
100 _fitsString = line.data();
101
102 //parse the line
103 _key = Trim(line.substr(0,8));
104 //COMMENT and/or HISTORY values
105 if (line.substr(8,2)!= "= ")
106 {
107 _value = "";
108 _comment = Trim(line.substr(10));
109 return;
110 }
111 string next=line.substr(10);
112 const size_t slash = next.find_first_of('/');
113 _value = Trim(Trim(Trim(next.substr(0, slash)), '\''));
114 _comment = Trim(next.substr(slash+1));
115 }
116 /**
117 * Alternative constroctor from the fits entry
118 */
119 HeaderEntry(const vector<char>& lineVec)
120 {
121 HeaderEntry(string(lineVec.data()));
122 }
123 /**
124 * Destructor
125 */
126 virtual ~HeaderEntry(){}
127
128 const string& value() const {return _value;}
129 const string& key() const {return _key;}
130 const string& comment() const {return _comment;}
131 const string& fitsString() const {return _fitsString;}
132
133 /**
134 * Set a keyword value.
135 * @param value The value to be set
136 * @param update whether the value already exist or not. To be modified soon.
137 */
138 template<typename T>
139 void setValue(const T& val)
140 {
141 ostringstream str;
142 str << val;
143 _value = str.str();
144 buildFitsString();
145 };
146 /**
147 * Set the comment for a given entry
148 */
149 void setComment(const string& comm)
150 {
151 _comment = comm;
152 buildFitsString();
153 }
154
155 private:
156 /**
157 * Construct the FITS header string from the key, value and comment
158 */
159 void buildFitsString()
160 {
161 ostringstream str;
162 unsigned int totSize = 0;
163
164 // Tuncate the key if required
165 if (_key.length() > 8)
166 {
167 str << _key.substr(0, 8);
168 totSize += 8;
169 }
170 else
171 {
172 str << _key;
173 totSize += _key.length();
174 }
175
176 // Append space if key is less than 8 chars long
177 for (int i=totSize; i<8;i++)
178 {
179 str << " ";
180 totSize++;
181 }
182
183 // Add separator
184 str << "= ";
185 totSize += 2;
186
187 // Format value
188 if (_value.length() < 20)
189 for (;totSize<30-_value.length();totSize++)
190 str << " ";
191
192 if (_value.length() > 70)
193 {
194 str << _value.substr(0,70);
195 totSize += 70;
196 }
197 else
198 {
199 str << _value;
200 totSize += _value.size();
201 }
202
203 // If there is space remaining, add comment area
204 if (totSize < 77)
205 {
206 str << " / ";
207 totSize += 3;
208 if (totSize < 80)
209 {
210 unsigned int commentSize = 80 - totSize;
211 if (_comment.length() > commentSize)
212 {
213 str << _comment.substr(0,commentSize);
214 totSize += commentSize;
215 }
216 else
217 {
218 str << _comment;
219 totSize += _comment.length();
220 }
221 }
222 }
223
224 // If there is yet some free space, fill up the entry with spaces
225 for (int i=totSize; i<80;i++)
226 str << " ";
227
228 _fitsString = str.str();
229
230 // Check for correct completion
231 if (_fitsString.length() != 80)
232 cout << "Error |" << _fitsString << "| is not of length 80" << endl;
233 }
234
235 string _key; ///< the key (name) of the header entry
236 string _value; ///< the value of the header entry
237 string _comment; ///< the comment associated to the header entry
238 string _fitsString; ///< the string that will be written to the fits file
239 };
240
241 /**
242 * Supported compressions
243 */
244 typedef enum
245 {
246 UNCOMPRESSED,
247 SMOOTHMAN
248 } FitsCompression;
249
250 /**
251 * Columns class
252 */
253 class ColumnEntry
254 {
255 public:
256 /**
257 * Default constructor
258 */
259 ColumnEntry();
260
261 /**
262 * Default destructor
263 */
264 virtual ~ColumnEntry(){}
265
266 /**
267 * Constructor from values
268 * @param n the column name
269 * @param t the column type
270 * @param numof the number of entries in the column
271 * @param comp the compression type for this column
272 */
273 ColumnEntry(const string& n,
274 char t,
275 int numOf,
276 BlockHeader& head,
277 vector<uint16_t>& seq) : _name(n),
278 _num(numOf),
279 _typeSize(0),
280 _offset(0),
281 _type(t),
282 _description(""),
283 _header(head),
284 _compSequence(seq)
285 {
286 switch (t)
287 {
288 case 'L':
289 case 'A':
290 case 'B': _typeSize = 1; break;
291 case 'I': _typeSize = 2; break;
292 case 'J':
293 case 'E': _typeSize = 4; break;
294 case 'K':
295 case 'D': _typeSize = 8; break;
296 default:
297 cout << "Error: typename " << t << " missing in the current implementation" << endl;
298 };
299
300 ostringstream str;
301 str << "data format of field: ";
302
303 switch (t)
304 {
305 case 'L': str << "1-byte BOOL"; break;
306 case 'A': str << "1-byte CHAR"; break;
307 case 'B': str << "BYTE"; break;
308 case 'I': str << "2-byte INTEGER"; break;
309 case 'J': str << "4-byte INTEGER"; break;
310 case 'K': str << "8-byte INTEGER"; break;
311 case 'E': str << "4-byte FLOAT"; break;
312 case 'D': str << "8-byte FLOAT"; break;
313 }
314
315 _description = str.str();
316 }
317
318 const string& name() const { return _name;};
319 int width() const { return _num*_typeSize;};
320 int offset() const { return _offset; };
321 int numElems() const { return _num; };
322 int sizeOfElems() const { return _typeSize;};
323 void setOffset(int off) { _offset = off;};
324 char type() const { return _type;};
325 string getDescription() const { return _description;}
326 BlockHeader& getBlockHeader() { return _header;}
327 const vector<uint16_t>& getCompressionSequence() const { return _compSequence;}
328 const char& getColumnOrdering() const { return _header.ordering;}
329
330
331 string getCompressionString() const
332 {
333 return "FACT";
334 /*
335 ostringstream str;
336 for (uint32_t i=0;i<_compSequence.size();i++)
337 switch (_compSequence[i])
338 {
339 case FACT_RAW: if (str.str().size() == 0) str << "RAW"; break;
340 case FACT_SMOOTHING: str << "SMOOTHING "; break;
341 case FACT_HUFFMAN16: str << "HUFFMAN16 "; break;
342 };
343 return str.str();*/
344 }
345
346 private:
347
348 string _name; ///< name of the column
349 int _num; ///< number of elements contained in one row of this column
350 int _typeSize; ///< the number of bytes taken by one element
351 int _offset; ///< the offset of the column, in bytes, from the beginning of one row
352 char _type; ///< the type of the column, as specified by the fits documentation
353 string _description; ///< a description for the column. It will be placed in the header
354 BlockHeader _header;
355 vector<uint16_t> _compSequence;
356 };
357
358 public:
359 ///@brief default constructor. Assigns a default number of rows and tiles
360 CompressedFitsFile(uint32_t numTiles=100, uint32_t numRowsPerTile=100);
361
362 ///@brief default destructor
363 virtual ~CompressedFitsFile();
364
365 ///@brief get the header of the file
366 vector<HeaderEntry>& getHeaderEntries() { return _header;}
367
368 protected:
369 ///@brief protected function to allocate the intermediate buffers
370 bool reallocateBuffers();
371
372 //FITS related stuff
373 vector<HeaderEntry> _header; ///< Header keys
374 vector<ColumnEntry> _columns; ///< Columns in the file
375 uint32_t _numTiles; ///< Number of tiles (i.e. groups of rows)
376 uint32_t _numRowsPerTile; ///< Number of rows per tile
377 uint32_t _totalNumRows; ///< Total number of raws
378 uint32_t _rowWidth; ///< Total number of bytes in one row
379 bool _headerFlushed; ///< Flag telling whether the header record is synchronized with the data on disk
380 char* _buffer; ///< Memory buffer to store rows while they are not compressed
381 Checksum _checksum; ///< Checksum for asserting the consistency of the data
382 fstream _file; ///< The actual file streamer for accessing disk data
383
384 //compression related stuff
385 typedef pair<int64_t, int64_t> CatalogEntry;
386 typedef vector<CatalogEntry> CatalogRow;
387 typedef vector<CatalogRow> CatalogType;
388 CatalogType _catalog; ///< Catalog, i.e. the main table that points to the compressed data.
389 uint64_t _heapPtr; ///< the address in the file of the heap area
390 vector<char*> _transposedBuffer; ///< Memory buffer to store rows while they are transposed
391 vector<char*> _compressedBuffer; ///< Memory buffer to store rows while they are compressed
392
393 //thread related stuff
394 uint32_t _numThreads; ///< The number of threads that will be used to compress
395 uint32_t _threadIndex; ///< A variable to assign threads indices
396 vector<pthread_t> _thread; ///< The thread handler of the compressor
397 vector<uint32_t> _threadNumRows; ///< Total number of rows for thread to compress
398 vector<uint32_t> _threadStatus; ///< Flag telling whether the buffer to be transposed (and compressed) is full or empty
399
400 //thread states. Not all used, but they do not hurt
401 static const uint32_t _THREAD_WAIT_; ///< Thread doing nothing
402 static const uint32_t _THREAD_COMPRESS_; ///< Thread working, compressing
403 static const uint32_t _THREAD_DECOMPRESS_; ///< Thread working, decompressing
404 static const uint32_t _THREAD_WRITE_; ///< Thread writing data to disk
405 static const uint32_t _THREAD_READ_; ///< Thread reading data from disk
406 static const uint32_t _THREAD_EXIT_; ///< Thread exiting
407
408 static HeaderEntry _dummyHeaderEntry; ///< Dummy entry for returning if requested on is not found
409};
410
411class CompressedFitsWriter : public CompressedFitsFile
412{
413 public:
414 ///@brief Default constructor. 100 tiles of 100 rows each are assigned by default
415 CompressedFitsWriter(uint32_t numTiles=100, uint32_t numRowsPerTile=100);
416
417 ///@brief default destructor
418 virtual ~CompressedFitsWriter();
419
420 ///@brief add one column to the file
421 bool addColumn(const ColumnEntry& column);
422
423 ///@brief sets a given header key
424 bool setHeaderKey(const HeaderEntry&);
425
426 bool changeHeaderKey(const string& origName, const string& newName);
427
428 ///@brief open a new fits file
429 bool open(const string& fileName, const string& tableName="Data");
430
431 ///@brief close the opened file
432 bool close();
433
434 ///@brief write one row of data, already placed in bufferToWrite. Does the byte-swapping
435 bool writeBinaryRow(const char* bufferToWrite);
436
437 uint32_t getRowWidth();
438
439 ///@brief assign a given (already loaded) drs calibration
440 void setDrsCalib(int16_t* data);
441
442 ///@brief set the number of worker threads compressing the data.
443 bool setNumWorkingThreads(uint32_t num);
444
445 private:
446 ///@brief compresses one buffer of data, the one given by threadIndex
447 uint64_t compressBuffer(uint32_t threadIndex);
448
449 ///@brief writes an already compressed buffer to disk
450 bool writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite);
451
452 ///@brief add the header checksum to the datasum
453 void addHeaderChecksum(Checksum& checksum);
454
455 ///@brief write the header. If closingFile is set to true, checksum is calculated
456 void writeHeader(bool closingFile = false);
457
458 ///@brief write the compressed data catalog. If closingFile is set to true, checksum is calculated
459 void writeCatalog(bool closingFile=false);
460
461 /// FIXME this was a bad idea. Move everything to the regular header
462 vector<HeaderEntry> _defaultHeader;
463
464 /// the main function compressing the data
465 static void* threadFunction(void* context);
466
467 /// Write the drs calibration to disk, if any
468 void writeDrsCalib();
469
470 /// Copy and transpose (or semi-transpose) one tile of data
471 void copyTransposeTile(uint32_t index);
472
473 /// Specific compression functions
474 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
475 uint32_t compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
476 uint32_t compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
477 uint32_t applySMOOTHING(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
478
479 int32_t _checkOffset; ///< offset to the data pointer to calculate the checksum
480 int16_t* _drsCalibData; ///< array of the Drs baseline mean
481 int32_t _threadLooper; ///< Which thread will deal with the upcoming bunch of data ?
482 pthread_mutex_t _mutex; ///< mutex for compressing threads
483
484
485 static string _emptyBlock; ///< an empty block to be apened at the end of a file so that its length is a multiple of 2880
486 static string _fitsHeader; ///< the default header to be written in every fits file
487};
488
489const uint32_t CompressedFitsFile::_THREAD_WAIT_ = 0;
490const uint32_t CompressedFitsFile::_THREAD_COMPRESS_ = 1;
491const uint32_t CompressedFitsFile::_THREAD_DECOMPRESS_= 2;
492const uint32_t CompressedFitsFile::_THREAD_WRITE_ = 3;
493const uint32_t CompressedFitsFile::_THREAD_READ_ = 4;
494const uint32_t CompressedFitsFile::_THREAD_EXIT_ = 5;
495
496template<>
497void CompressedFitsFile::HeaderEntry::setValue(const string& v)
498{
499 string val = v;
500 if (val.size() > 2 && val[0] == '\'')
501 {
502 size_t pos = val.find_last_of("'");
503 if (pos != string::npos && pos != 0)
504 val = val.substr(1, pos-1);
505 }
506 ostringstream str;
507
508 str << "'" << val << "'";
509 for (int i=str.str().length(); i<20;i++)
510 str << " ";
511 _value = str.str();
512 buildFitsString();
513}
514
515/**
516 * Default header to be written in all fits files
517 */
518string CompressedFitsWriter::_fitsHeader = "SIMPLE = T / file does conform to FITS standard "
519 "BITPIX = 8 / number of bits per data pixel "
520 "NAXIS = 0 / number of data axes "
521 "EXTEND = T / FITS dataset may contain extensions "
522 "CHECKSUM= '4AcB48bA4AbA45bA' / Checksum for the whole HDU "
523 "DATASUM = ' 0' / Checksum for the data block "
524 "COMMENT FITS (Flexible Image Transport System) format is defined in 'Astronomy"
525 "COMMENT and Astrophysics', volume 376, page 359; bibcode: 2001A&A...376..359H "
526 "END "
527 " "
528 " "
529 " "
530 " "
531 " "
532 " "
533 " "
534 " "
535 " "
536 " "
537 " "
538 " "
539 " "
540 " "
541 " "
542 " "
543 " "
544 " "
545 " "
546 " "
547 " "
548 " "
549 " "
550 " "
551 " "
552 " "
553 " ";
554
555/**
556 * Empty block to be appenned at the end of files, so that the length matches multiple of 2880 bytes
557 *
558 */
559string CompressedFitsWriter::_emptyBlock = " "
560 " "
561 " "
562 " "
563 " "
564 " "
565 " "
566 " "
567 " "
568 " "
569 " "
570 " "
571 " "
572 " "
573 " "
574 " "
575 " "
576 " "
577 " "
578 " "
579 " "
580 " "
581 " "
582 " "
583 " "
584 " "
585 " "
586 " "
587 " "
588 " "
589 " "
590 " "
591 " "
592 " "
593 " "
594 " ";
595
596CompressedFitsFile::HeaderEntry CompressedFitsFile::_dummyHeaderEntry;
597/****************************************************************
598 * SUPER CLASS DEFAULT CONSTRUCTOR
599 ****************************************************************/
600CompressedFitsFile::CompressedFitsFile(uint32_t numTiles,
601 uint32_t numRowsPerTile) : _header(),
602 _columns(),
603 _numTiles(numTiles),
604 _numRowsPerTile(numRowsPerTile),
605 _totalNumRows(0),
606 _rowWidth(0),
607 _headerFlushed(false),
608 _buffer(NULL),
609 _checksum(0),
610 _heapPtr(0),
611 _transposedBuffer(1),
612 _compressedBuffer(1),
613 _numThreads(1),
614 _threadIndex(0),
615 _thread(1),
616 _threadNumRows(1),
617 _threadStatus(1)
618{
619 _catalog.resize(_numTiles);
620 _transposedBuffer[0] = NULL;
621 _compressedBuffer[0] = NULL;
622 _threadStatus[0] = _THREAD_WAIT_;
623 _threadNumRows[0] = 0;
624}
625
626/****************************************************************
627 * SUPER CLASS DEFAULT DESTRUCTOR
628 ****************************************************************/
629CompressedFitsFile::~CompressedFitsFile()
630{
631 if (_buffer != NULL)
632 {
633 _buffer = _buffer-4;
634 delete[] _buffer;
635 _buffer = NULL;
636 for (uint32_t i=0;i<_numThreads;i++)
637 {
638 _compressedBuffer[i] = _compressedBuffer[i]-4;
639 delete[] _transposedBuffer[i];
640 delete[] _compressedBuffer[i];
641 _transposedBuffer[i] = NULL;
642 _compressedBuffer[i] = NULL;
643 }
644 }
645 if (_file.is_open())
646 _file.close();
647}
648
649/****************************************************************
650 * REALLOCATE BUFFER
651 ****************************************************************/
652bool CompressedFitsFile::reallocateBuffers()
653{
654 if (_buffer)
655 {
656 _buffer = _buffer - 4;
657 delete[] _buffer;
658 for (uint32_t i=0;i<_compressedBuffer.size();i++)
659 {
660 _compressedBuffer[i] = _compressedBuffer[i]-4;
661 delete[] _transposedBuffer[i];
662 delete[] _compressedBuffer[i];
663 }
664 }
665 _buffer = new char[_rowWidth*_numRowsPerTile + 12];
666 if (_buffer == NULL) return false;
667 memset(_buffer, 0, 4);
668 _buffer = _buffer + 4;
669 if (_compressedBuffer.size() != _numThreads)
670 {
671 _transposedBuffer.resize(_numThreads);
672 _compressedBuffer.resize(_numThreads);
673 }
674 for (uint32_t i=0;i<_numThreads;i++)
675 {
676 _transposedBuffer[i] = new char[_rowWidth*_numRowsPerTile];
677 _compressedBuffer[i] = new char[_rowWidth*_numRowsPerTile + _columns.size() + sizeof(TileHeader) + 12]; //use a bit more memory for compression flags and checksumming
678 if (_transposedBuffer[i] == NULL || _compressedBuffer[i] == NULL)
679 return false;
680 //shift the compressed buffer by 4 bytes, for checksum calculation
681 memset(_compressedBuffer[i], 0, 4);
682 _compressedBuffer[i] = _compressedBuffer[i]+4;
683 //initialize the tile header
684 TileHeader tileHeader;
685 memcpy(_compressedBuffer[i], &tileHeader, sizeof(TileHeader));
686 }
687 return true;
688}
689
690/****************************************************************
691 * DEFAULT WRITER CONSTRUCTOR
692 ****************************************************************/
693CompressedFitsWriter::CompressedFitsWriter(uint32_t numTiles,
694 uint32_t numRowsPerTile) : CompressedFitsFile(numTiles, numRowsPerTile),
695 _checkOffset(0),
696 _drsCalibData(NULL),
697 _threadLooper(0)
698{
699 _defaultHeader.push_back(HeaderEntry("XTENSION", "'BINTABLE' ", "binary table extension"));
700 _defaultHeader.push_back(HeaderEntry("BITPIX", 8, "8-bit bytes"));
701 _defaultHeader.push_back(HeaderEntry("NAXIS", 2, "2-dimensional binary table"));
702 _defaultHeader.push_back(HeaderEntry("NAXIS1", _rowWidth, "width of table in bytes"));
703 _defaultHeader.push_back(HeaderEntry("NAXIS2", numTiles, "num of rows in table"));
704 _defaultHeader.push_back(HeaderEntry("PCOUNT", 0, "size of special data area"));
705 _defaultHeader.push_back(HeaderEntry("GCOUNT", 1, "one data group (required keyword)"));
706 _defaultHeader.push_back(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
707 _defaultHeader.push_back(HeaderEntry("CHECKSUM", "'0000000000000000' ", "Checksum for the whole HDU"));
708 _defaultHeader.push_back(HeaderEntry("DATASUM", " 0", "Checksum for the data block"));
709 //compression stuff
710 _defaultHeader.push_back(HeaderEntry("ZTABLE", "T", "Table is compressed"));
711 _defaultHeader.push_back(HeaderEntry("ZNAXIS1", 0, "Width of uncompressed rows"));
712 _defaultHeader.push_back(HeaderEntry("ZNAXIS2", 0, "Number of uncompressed rows"));
713 _defaultHeader.push_back(HeaderEntry("ZPCOUNT", 0, ""));
714 _defaultHeader.push_back(HeaderEntry("ZHEAPPTR", 0, ""));
715 _defaultHeader.push_back(HeaderEntry("ZTILELEN", numRowsPerTile, "Number of rows per tile"));
716 _defaultHeader.push_back(HeaderEntry("THEAP", 0, ""));
717
718 pthread_mutex_init(&_mutex, NULL);
719}
720
721/****************************************************************
722 * DEFAULT DESTRUCTOR
723 ****************************************************************/
724CompressedFitsWriter::~CompressedFitsWriter()
725{
726 pthread_mutex_destroy(&_mutex);
727}
728
729/****************************************************************
730 * SET THE POINTER TO THE DRS CALIBRATION
731 ****************************************************************/
732void CompressedFitsWriter::setDrsCalib(int16_t* data)
733{
734 _drsCalibData = data;
735}
736
737/****************************************************************
738 * SET NUM WORKING THREADS
739 ****************************************************************/
740bool CompressedFitsWriter::setNumWorkingThreads(uint32_t num)
741{
742 if (_file.is_open())
743 return false;
744 if (num < 1 || num > 64)
745 {
746 cout << "ERROR: num threads must be between 1 and 64. Ignoring" << endl;
747 return false;
748 }
749 _numThreads = num;
750 _transposedBuffer[0] = NULL;
751 _compressedBuffer[0] = NULL;
752 _threadStatus.resize(num);
753 _thread.resize(num);
754 _threadNumRows.resize(num);
755 for (uint32_t i=0;i<num;i++)
756 {
757 _threadNumRows[i] = 0;
758 _threadStatus[i] = _THREAD_WAIT_;
759 }
760 return reallocateBuffers();
761}
762
763/****************************************************************
764 * WRITE DRS CALIBRATION TO FILE
765 ****************************************************************/
766void CompressedFitsWriter::writeDrsCalib()
767{
768 //if file was not loaded, ignore
769 if (_drsCalibData == NULL)
770 return;
771 uint64_t whereDidIStart = _file.tellp();
772 vector<HeaderEntry> header;
773 header.push_back(HeaderEntry("XTENSION", "'BINTABLE' ", "binary table extension"));
774 header.push_back(HeaderEntry("BITPIX" , 8 , "8-bit bytes"));
775 header.push_back(HeaderEntry("NAXIS" , 2 , "2-dimensional binary table"));
776 header.push_back(HeaderEntry("NAXIS1" , 1024*1440*2 , "width of table in bytes"));
777 header.push_back(HeaderEntry("NAXIS2" , 1 , "number of rows in table"));
778 header.push_back(HeaderEntry("PCOUNT" , 0 , "size of special data area"));
779 header.push_back(HeaderEntry("GCOUNT" , 1 , "one data group (required keyword)"));
780 header.push_back(HeaderEntry("TFIELDS" , 1 , "number of fields in each row"));
781 header.push_back(HeaderEntry("CHECKSUM", "'0000000000000000' ", "Checksum for the whole HDU"));
782 header.push_back(HeaderEntry("DATASUM" , " 0" , "Checksum for the data block"));
783 header.push_back(HeaderEntry("EXTNAME" , "'ZDrsCellOffsets' ", "name of this binary table extension"));
784 header.push_back(HeaderEntry("TTYPE1" , "'OffsetCalibration' ", "label for field 1"));
785 header.push_back(HeaderEntry("TFORM1" , "'1474560I' ", "data format of field: 2-byte INTEGER"));
786
787 for (uint32_t i=0;i<header.size();i++)
788 _file.write(header[i].fitsString().c_str(), 80);
789 //End the header
790 _file.write("END ", 80);
791 long here = _file.tellp();
792 if (here%2880)
793 _file.write(_emptyBlock.c_str(), 2880 - here%2880);
794 //now write the data itself
795 int16_t* swappedBytes = new int16_t[1024];
796 Checksum checksum;
797 for (int32_t i=0;i<1440;i++)
798 {
799 memcpy(swappedBytes, &(_drsCalibData[i*1024]), 2048);
800 for (int32_t j=0;j<2048;j+=2)
801 {
802 int8_t inter;
803 inter = reinterpret_cast<int8_t*>(swappedBytes)[j];
804 reinterpret_cast<int8_t*>(swappedBytes)[j] = reinterpret_cast<int8_t*>(swappedBytes)[j+1];
805 reinterpret_cast<int8_t*>(swappedBytes)[j+1] = inter;
806 }
807 _file.write(reinterpret_cast<char*>(swappedBytes), 2048);
808 checksum.add(reinterpret_cast<char*>(swappedBytes), 2048);
809 }
810 uint64_t whereDidIStop = _file.tellp();
811 delete[] swappedBytes;
812 //No need to pad the data, as (1440*1024*2)%2880==0
813
814 //calculate the checksum from the header
815 ostringstream str;
816 str << checksum.val();
817 header[9] = HeaderEntry("DATASUM", str.str(), "Checksum for the data block");
818 for (vector<HeaderEntry>::iterator it=header.begin();it!=header.end(); it++)
819 checksum.add(it->fitsString().c_str(), 80);
820 string end("END ");
821 string space(" ");
822 checksum.add(end.c_str(), 80);
823 int headerRowsLeft = 36 - (header.size() + 1)%36;
824 for (int i=0;i<headerRowsLeft;i++)
825 checksum.add(space.c_str(), 80);
826 //udpate the checksum keyword
827 header[8] = HeaderEntry("CHECKSUM", checksum.str(), "Checksum for the whole HDU");
828 //and eventually re-write the header data
829 _file.seekp(whereDidIStart);
830 for (uint32_t i=0;i<header.size();i++)
831 _file.write(header[i].fitsString().c_str(), 80);
832 _file.seekp(whereDidIStop);
833}
834
835/****************************************************************
836 * ADD COLUMN
837 ****************************************************************/
838bool CompressedFitsWriter::addColumn(const ColumnEntry& column)
839{
840 if (_totalNumRows != 0)
841 {
842 cout << "Error: cannot add new columns once first row has been written" << endl;
843 return false;
844 }
845 for (vector<ColumnEntry>::iterator it=_columns.begin(); it != _columns.end(); it++)
846 {
847 if (it->name() == column.name())
848 {
849 cout << "Warning: column already exist (" << column.name() << "). Ignoring" << endl;
850 return false;
851 }
852 }
853 _columns.push_back(column);
854 _columns.back().setOffset(_rowWidth);
855 _rowWidth += column.width();
856 reallocateBuffers();
857
858 ostringstream str, str2, str3;
859 str << "TTYPE" << _columns.size();
860 str2 << column.name();
861 str3 << "label for field ";
862 if (_columns.size() < 10) str3 << " ";
863 if (_columns.size() < 100) str3 << " ";
864 str3 << _columns.size();
865 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
866 str.str("");
867 str2.str("");
868 str3.str("");
869 str << "TFORM" << _columns.size();
870 str2 << "1QB";
871 str3 << "data format of field " << _columns.size();
872 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
873 str.str("");
874 str2.str("");
875 str3.str("");
876 str << "ZFORM" << _columns.size();
877 str2 << column.numElems() << column.type();
878 str3 << "Original format of field " << _columns.size();
879 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
880 str.str("");
881 str2.str("");
882 str3.str("");
883 str << "ZCTYP" << _columns.size();
884 str2 << column.getCompressionString();
885 str3 << "Comp. Scheme of field " << _columns.size();
886 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
887 //resize the catalog vector accordingly
888 for (uint32_t i=0;i<_numTiles;i++)
889 {
890 _catalog[i].resize(_columns.size());
891 for (uint32_t j=0;j<_catalog[i].size();j++)
892 _catalog[i][j] = make_pair(0,0);
893 }
894 return true;
895}
896
897/****************************************************************
898 * SET HEADER KEY
899 ****************************************************************/
900bool CompressedFitsWriter::setHeaderKey(const HeaderEntry& entry)
901{
902 HeaderEntry ent = entry;
903 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
904 {
905 if (it->key() == entry.key())
906 {
907 if (entry.comment() == "")
908 ent.setComment(it->comment());
909 (*it) = ent;
910 _headerFlushed = false;
911 return true;
912 }
913 }
914 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
915 {
916 if (it->key() == entry.key())
917 {
918 if (entry.comment() == "")
919 ent.setComment(it->comment());
920 (*it) = ent;
921 _headerFlushed = false;
922 return true;
923 }
924 }
925 if (_totalNumRows != 0)
926 {
927 cout << "Error: new header keys (" << entry.key() << ") must be set before the first row is written. Ignoring." << endl;
928 return false;
929 }
930 _header.push_back(entry);
931 _headerFlushed = false;
932 return true;
933}
934
935bool CompressedFitsWriter::changeHeaderKey(const string& origName, const string& newName)
936{
937 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
938 {
939 if (it->key() == origName)
940 {
941 (*it) = HeaderEntry(newName, it->value(), it->comment());
942 _headerFlushed = false;
943 return true;
944 }
945 }
946 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
947 {
948 if (it->key() == origName)
949 {
950 (*it) = HeaderEntry(newName, it->value(), it->comment());
951 _headerFlushed = false;
952 return true;
953 }
954 }
955 return false;
956}
957/****************************************************************
958 * OPEN
959 ****************************************************************/
960bool CompressedFitsWriter::open(const string& fileName, const string& tableName)
961{
962 _file.open(fileName.c_str(), ios_base::out);
963 if (!_file.is_open())
964 {
965 cout << "Error: Could not open the file (" << fileName << ")." << endl;
966 return false;
967 }
968 _defaultHeader.push_back(HeaderEntry("EXTNAME", tableName, "name of this binary table extension"));
969 _headerFlushed = false;
970 _threadIndex = 0;
971 //create requested number of threads
972 for (uint32_t i=0;i<_numThreads;i++)
973 pthread_create(&(_thread[i]), NULL, threadFunction, this);
974 //wait for the threads to start
975 while (_numThreads != _threadIndex)
976 usleep(1000);
977 //set the writing fence to the last thread
978 _threadIndex = _numThreads-1;
979 return (_file.good());
980}
981
982/****************************************************************
983 * WRITE HEADER
984 ****************************************************************/
985void CompressedFitsWriter::writeHeader(bool closingFile)
986{
987 if (_headerFlushed)
988 return;
989 if (!_file.is_open())
990 return;
991
992 long cPos = _file.tellp();
993
994 _file.seekp(0);
995
996 _file.write(_fitsHeader.c_str(), 2880);
997
998 //Write the DRS calib table here !
999 writeDrsCalib();
1000
1001 //we are now at the beginning of the main table. Write its header
1002 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
1003 _file.write(it->fitsString().c_str(), 80);
1004
1005 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
1006 _file.write(it->fitsString().c_str(), 80);
1007
1008 _file.write("END ", 80);
1009 long here = _file.tellp();
1010 if (here%2880)
1011 _file.write(_emptyBlock.c_str(), 2880 - here%2880);
1012
1013 _headerFlushed = true;
1014
1015 here = _file.tellp();
1016
1017 if (here%2880)
1018 cout << "Error: seems that header did not finish at the end of a block." << endl;
1019
1020 if (here > cPos && cPos != 0)
1021 {
1022 cout << "Error, entries were added after the first row was written. This is not supposed to happen." << endl;
1023 return;
1024 }
1025
1026 here = _file.tellp();
1027 writeCatalog(closingFile);
1028
1029 here = _file.tellp() - here;
1030 _heapPtr = here;
1031
1032 if (cPos != 0)
1033 _file.seekp(cPos);
1034}
1035
1036/****************************************************************
1037 * WRITE CATALOG
1038 * WARNING: writeCatalog is only meant to be used by writeHeader.
1039 * external usage will most likely corrupt the file
1040 ****************************************************************/
1041void CompressedFitsWriter::writeCatalog(bool closingFile)
1042{
1043 uint32_t sizeWritten = 0;
1044 for (uint32_t i=0;i<_catalog.size();i++)
1045 {
1046 for (uint32_t j=0;j<_catalog[i].size();j++)
1047 {
1048 //swap the bytes
1049 int8_t swappedEntry[16];
1050 swappedEntry[0] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[7];
1051 swappedEntry[1] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[6];
1052 swappedEntry[2] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[5];
1053 swappedEntry[3] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[4];
1054 swappedEntry[4] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[3];
1055 swappedEntry[5] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[2];
1056 swappedEntry[6] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[1];
1057 swappedEntry[7] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[0];
1058
1059 swappedEntry[8] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[7];
1060 swappedEntry[9] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[6];
1061 swappedEntry[10] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[5];
1062 swappedEntry[11] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[4];
1063 swappedEntry[12] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[3];
1064 swappedEntry[13] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[2];
1065 swappedEntry[14] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[1];
1066 swappedEntry[15] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[0];
1067 if (closingFile)
1068 {
1069 _checksum.add(reinterpret_cast<char*>(swappedEntry), 16);
1070 }
1071 _file.write(reinterpret_cast<char*>(&swappedEntry[0]), 2*sizeof(int64_t));
1072 sizeWritten += 2*sizeof(int64_t);
1073 }
1074 }
1075
1076 //we do not reserve space for now because fverify does not like that.
1077 //TODO bug should be fixed in the new version. Install it on the cluster and restor space reservation
1078 return ;
1079
1080 //write the padding so that the HEAP section starts at a 2880 bytes boundary
1081 if (sizeWritten % 2880 != 0)
1082 {
1083 vector<char> nullVec(2880 - sizeWritten%2880, 0);
1084 _file.write(nullVec.data(), 2880 - sizeWritten%2880);
1085 }
1086}
1087
1088/****************************************************************
1089 * ADD HEADER CHECKSUM
1090 ****************************************************************/
1091void CompressedFitsWriter::addHeaderChecksum(Checksum& checksum)
1092{
1093 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin();it!=_defaultHeader.end(); it++)
1094 _checksum.add(it->fitsString().c_str(), 80);
1095 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
1096 _checksum.add(it->fitsString().c_str(), 80);
1097 string end("END ");
1098 string space(" ");
1099 checksum.add(end.c_str(), 80);
1100 int headerRowsLeft = 36 - (_defaultHeader.size() + _header.size() + 1)%36;
1101 for (int i=0;i<headerRowsLeft;i++)
1102 checksum.add(space.c_str(), 80);
1103}
1104
1105/****************************************************************
1106 * CLOSE
1107 ****************************************************************/
1108bool CompressedFitsWriter::close()
1109{
1110 for (uint32_t i=0;i<_numThreads;i++)
1111 while (_threadStatus[i] != _THREAD_WAIT_)
1112 usleep(100000);
1113 for (uint32_t i=0;i<_numThreads;i++)
1114 _threadStatus[i] = _THREAD_EXIT_;
1115 for (uint32_t i=0;i<_numThreads;i++)
1116 pthread_join(_thread[i], NULL);
1117 //flush the rows that were not written yet
1118 if (_totalNumRows%_numRowsPerTile != 0)
1119 {
1120 copyTransposeTile(0);
1121
1122 _threadNumRows[0] = _totalNumRows;
1123 uint32_t numBytes = compressBuffer(0);
1124 writeCompressedDataToDisk(0, numBytes);
1125 }
1126 //compression stuff
1127 setHeaderKey(HeaderEntry("ZNAXIS1", _rowWidth, "Width of uncompressed rows"));
1128 setHeaderKey(HeaderEntry("ZNAXIS2", _totalNumRows, "Number of uncompressed rows"));
1129 //TODO calculate the real offset from the main table to the start of the HEAP data area
1130 setHeaderKey(HeaderEntry("ZHEAPPTR", _heapPtr, ""));
1131 setHeaderKey(HeaderEntry("THEAP", _heapPtr, ""));
1132
1133 //regular stuff
1134 if (_catalog.size() > 0)
1135 {
1136 setHeaderKey(HeaderEntry("NAXIS1", 2*sizeof(int64_t)*_catalog[0].size(), "width of table in bytes"));
1137 setHeaderKey(HeaderEntry("NAXIS2", _numTiles, ""));
1138 setHeaderKey(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
1139 int64_t heapSize = 0;
1140 int64_t compressedOffset = 0;
1141 for (uint32_t i=0;i<_catalog.size();i++)
1142 {
1143 compressedOffset += sizeof(TileHeader);
1144 heapSize += sizeof(TileHeader);
1145 for (uint32_t j=0;j<_catalog[i].size();j++)
1146 {
1147 heapSize += _catalog[i][j].first;
1148// cout << "heapSize: " << heapSize << endl;
1149 //set the catalog offsets to their actual values
1150 _catalog[i][j].second = compressedOffset;
1151 compressedOffset += _catalog[i][j].first;
1152 //special case if entry has zero length
1153 if (_catalog[i][j].first == 0) _catalog[i][j].second = 0;
1154 }
1155 }
1156 setHeaderKey(HeaderEntry("PCOUNT", heapSize, "size of special data area"));
1157 }
1158 else
1159 {
1160 setHeaderKey(HeaderEntry("NAXIS1", _columns.size()*2*sizeof(int64_t), "width of table in bytes"));
1161 setHeaderKey(HeaderEntry("NAXIS2", 0, ""));
1162 setHeaderKey(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
1163 setHeaderKey(HeaderEntry("PCOUNT", 0, "size of special data area"));
1164 changeHeaderKey("THEAP", "ZHEAP");
1165 }
1166 ostringstream str;
1167
1168 writeHeader(true);
1169
1170 str.str("");
1171 str << _checksum.val();
1172
1173 setHeaderKey(HeaderEntry("DATASUM", str.str(), ""));
1174 addHeaderChecksum(_checksum);
1175 setHeaderKey(HeaderEntry("CHECKSUM", _checksum.str(), ""));
1176 //update header value
1177 writeHeader();
1178 //update file length
1179 long here = _file.tellp();
1180 if (here%2880)
1181 {
1182 vector<char> nullVec(2880 - here%2880, 0);
1183 _file.write(nullVec.data(), 2880 - here%2880);
1184 }
1185 _file.close();
1186 return true;
1187}
1188
1189/****************************************************************
1190 * COPY TRANSPOSE TILE
1191 ****************************************************************/
1192void CompressedFitsWriter::copyTransposeTile(uint32_t index)
1193{
1194 uint32_t thisRoundNumRows = (_totalNumRows%_numRowsPerTile) ? _totalNumRows%_numRowsPerTile : _numRowsPerTile;
1195
1196 //copy the tile and transpose it
1197 uint32_t offset = 0;
1198 for (uint32_t i=0;i<_columns.size();i++)
1199 {
1200 switch (_columns[i].getColumnOrdering())//getCompression())
1201 {
1202 case zfits::kOrderByRow:
1203 for (uint32_t k=0;k<thisRoundNumRows;k++)
1204 {//regular, "semi-transposed" copy
1205 memcpy(&(_transposedBuffer[index][offset]), &_buffer[k*_rowWidth + _columns[i].offset()], _columns[i].sizeOfElems()*_columns[i].numElems());
1206 offset += _columns[i].sizeOfElems()*_columns[i].numElems();
1207 }
1208 break;
1209
1210 case zfits::kOrderByCol :
1211 for (int j=0;j<_columns[i].numElems();j++)
1212 for (uint32_t k=0;k<thisRoundNumRows;k++)
1213 {//transposed copy
1214 memcpy(&(_transposedBuffer[index][offset]), &_buffer[k*_rowWidth + _columns[i].offset() + _columns[i].sizeOfElems()*j], _columns[i].sizeOfElems());
1215 offset += _columns[i].sizeOfElems();
1216 }
1217 break;
1218 default:
1219 cout << "Error: unknown column ordering: " << _columns[i].getColumnOrdering() << endl;
1220
1221 };
1222 }
1223}
1224
1225/****************************************************************
1226 * WRITE BINARY ROW
1227 ****************************************************************/
1228bool CompressedFitsWriter::writeBinaryRow(const char* bufferToWrite)
1229{
1230 if (_totalNumRows == 0)
1231 writeHeader();
1232
1233 memcpy(&_buffer[_rowWidth*(_totalNumRows%_numRowsPerTile)], bufferToWrite, _rowWidth);
1234 _totalNumRows++;
1235 if (_totalNumRows%_numRowsPerTile == 0)
1236 {
1237 //which is the next thread that we should use ?
1238 while (_threadStatus[_threadLooper] == _THREAD_COMPRESS_)
1239 usleep(100000);
1240
1241 copyTransposeTile(_threadLooper);
1242
1243 while (_threadStatus[_threadLooper] != _THREAD_WAIT_)
1244 usleep(100000);
1245
1246 _threadNumRows[_threadLooper] = _totalNumRows;
1247 _threadStatus[_threadLooper] = _THREAD_COMPRESS_;
1248 _threadLooper = (_threadLooper+1)%_numThreads;
1249 }
1250 return _file.good();
1251}
1252
1253uint32_t CompressedFitsWriter::getRowWidth()
1254{
1255 return _rowWidth;
1256}
1257
1258/****************************************************************
1259 * COMPRESS BUFFER
1260 ****************************************************************/
1261uint32_t CompressedFitsWriter::compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1262{
1263 memcpy(dest, src, numRows*sizeOfElems*numRowElems);
1264 return numRows*sizeOfElems*numRowElems;
1265}
1266
1267/****************************************************************
1268 * COMPRESS BUFFER
1269 ****************************************************************/
1270uint32_t CompressedFitsWriter::compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1271{
1272 string huffmanOutput;
1273 uint32_t previousHuffmanSize = 0;
1274 if (numRows < 2)
1275 {//if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
1276 return numRows*sizeOfElems*numRowElems + 1000;
1277 }
1278 if (sizeOfElems < 2 )
1279 {
1280 cout << "Fatal ERROR: HUFMANN can only encode short or longer types" << endl;
1281 return 0;
1282 }
1283 uint32_t huffmanOffset = 0;
1284 for (uint32_t j=0;j<numRowElems;j++)
1285 {
1286 Huffman::Encode(huffmanOutput,
1287 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
1288 numRows*(sizeOfElems/2));
1289 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
1290 huffmanOffset += sizeof(uint32_t);
1291 previousHuffmanSize = huffmanOutput.size();
1292 }
1293 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
1294
1295 //only copy if not larger than not-compressed size
1296 if (totalSize < numRows*sizeOfElems*numRowElems)
1297 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
1298
1299 return totalSize;
1300}
1301
1302/****************************************************************
1303 * COMPRESS BUFFER
1304 ****************************************************************/
1305uint32_t CompressedFitsWriter::compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1306{
1307 uint32_t colWidth = numRowElems;
1308 for (int j=colWidth*numRows-1;j>1;j--)
1309 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
1310 //call the huffman transposed
1311 return compressHUFFMAN(dest, src, numRowElems, sizeOfElems, numRows);
1312}
1313
1314uint32_t CompressedFitsWriter::applySMOOTHING(char* , char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1315{
1316 uint32_t colWidth = numRowElems;
1317 for (int j=colWidth*numRows-1;j>1;j--)
1318 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
1319
1320 return numRows*sizeOfElems*numRowElems;
1321}
1322/****************************************************************
1323 * COMPRESS BUFFER
1324 ****************************************************************/
1325uint64_t CompressedFitsWriter::compressBuffer(uint32_t threadIndex)
1326{
1327 uint32_t thisRoundNumRows = (_threadNumRows[threadIndex]%_numRowsPerTile) ? _threadNumRows[threadIndex]%_numRowsPerTile : _numRowsPerTile;
1328 uint32_t offset=0;
1329 uint32_t currentCatalogRow = (_threadNumRows[threadIndex]-1)/_numRowsPerTile;
1330 uint64_t compressedOffset = sizeof(TileHeader); //skip the 'TILE' marker and tile size entry
1331
1332 //now compress each column one by one by calling compression on arrays
1333 for (uint32_t i=0;i<_columns.size();i++)
1334 {
1335 _catalog[currentCatalogRow][i].second = compressedOffset;
1336
1337 if (_columns[i].numElems() == 0) continue;
1338
1339 BlockHeader& head = _columns[i].getBlockHeader();
1340 const vector<uint16_t>& sequence = _columns[i].getCompressionSequence();
1341 //set the default byte telling if uncompressed the compressed Flag
1342 uint64_t previousOffset = compressedOffset;
1343 //skip header data
1344 compressedOffset += sizeof(BlockHeader) + sizeof(uint16_t)*sequence.size();
1345
1346 for (uint32_t j=0;j<sequence.size(); j++)
1347 {
1348 switch (sequence[j])
1349 {
1350 case zfits::kFactRaw:
1351// if (head.numProcs == 1)
1352 compressedOffset += compressUNCOMPRESSED(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1353 break;
1354 case zfits::kFactSmoothing:
1355 applySMOOTHING(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1356 break;
1357 case zfits::kFactHuffman16:
1358 if (head.ordering == zfits::kOrderByCol)
1359 compressedOffset += compressHUFFMAN(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1360 else
1361 compressedOffset += compressHUFFMAN(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), _columns[i].numElems(), _columns[i].sizeOfElems(), thisRoundNumRows);
1362 break;
1363 default:
1364 cout << "ERROR: Unkown compression sequence entry: " << sequence[i] << endl;
1365 break;
1366 }
1367 }
1368
1369 //check if compressed size is larger than uncompressed
1370 if (sequence[0] != zfits::kFactRaw &&
1371 compressedOffset - previousOffset > _columns[i].sizeOfElems()*_columns[i].numElems()*thisRoundNumRows+sizeof(BlockHeader)+sizeof(uint16_t)*sequence.size())
1372 {//if so set flag and redo it uncompressed
1373 cout << "REDOING UNCOMPRESSED" << endl;
1374 compressedOffset = previousOffset + sizeof(BlockHeader) + 1;
1375 compressedOffset += compressUNCOMPRESSED(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1376 BlockHeader he;
1377 he.size = compressedOffset - previousOffset;
1378 he.numProcs = 1;
1379 he.ordering = zfits::kOrderByRow;
1380 memcpy(&(_compressedBuffer[threadIndex][previousOffset]), (char*)(&he), sizeof(BlockHeader));
1381 _compressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)] = zfits::kFactRaw;
1382 offset += thisRoundNumRows*_columns[i].sizeOfElems()*_columns[i].numElems();
1383 _catalog[currentCatalogRow][i].first = compressedOffset - _catalog[currentCatalogRow][i].second;
1384 continue;
1385 }
1386 head.size = compressedOffset - previousOffset;
1387 memcpy(&(_compressedBuffer[threadIndex][previousOffset]), (char*)(&head), sizeof(BlockHeader));
1388 memcpy(&(_compressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)]), sequence.data(), sizeof(uint16_t)*sequence.size());
1389
1390 offset += thisRoundNumRows*_columns[i].sizeOfElems()*_columns[i].numElems();
1391 _catalog[currentCatalogRow][i].first = compressedOffset - _catalog[currentCatalogRow][i].second;
1392 }
1393
1394 TileHeader tHead(thisRoundNumRows, compressedOffset);
1395 memcpy(_compressedBuffer[threadIndex], &tHead, sizeof(TileHeader));
1396 return compressedOffset;
1397}
1398
1399/****************************************************************
1400 * WRITE COMPRESS DATA TO DISK
1401 ****************************************************************/
1402bool CompressedFitsWriter::writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite)
1403{
1404 char* checkSumPointer = _compressedBuffer[threadID];
1405 int32_t extraBytes = 0;
1406 uint32_t sizeToChecksum = sizeToWrite;
1407 if (_checkOffset != 0)
1408 {//should we extend the array to the left ?
1409 sizeToChecksum += _checkOffset;
1410 checkSumPointer -= _checkOffset;
1411 memset(checkSumPointer, 0, _checkOffset);
1412 }
1413 if (sizeToChecksum%4 != 0)
1414 {//should we extend the array to the right ?
1415 extraBytes = 4 - (sizeToChecksum%4);
1416 memset(checkSumPointer+sizeToChecksum, 0,extraBytes);
1417 sizeToChecksum += extraBytes;
1418 }
1419 //do the checksum
1420 _checksum.add(checkSumPointer, sizeToChecksum);
1421// cout << endl << "Checksum: " << _checksum.val() << endl;
1422 _checkOffset = (4 - extraBytes)%4;
1423 //write data to disk
1424 _file.write(_compressedBuffer[threadID], sizeToWrite);
1425 return _file.good();
1426}
1427
1428/****************************************************************
1429 * WRITER THREAD LOOP
1430 ****************************************************************/
1431void* CompressedFitsWriter::threadFunction(void* context)
1432{
1433 CompressedFitsWriter* myself =static_cast<CompressedFitsWriter*>(context);
1434
1435 uint32_t myID = 0;
1436 pthread_mutex_lock(&(myself->_mutex));
1437 myID = myself->_threadIndex++;
1438 pthread_mutex_unlock(&(myself->_mutex));
1439 uint32_t threadToWaitForBeforeWriting = (myID == 0) ? myself->_numThreads-1 : myID-1;
1440
1441 while (myself->_threadStatus[myID] != _THREAD_EXIT_)
1442 {
1443 while (myself->_threadStatus[myID] == _THREAD_WAIT_)
1444 usleep(100000);
1445 if (myself->_threadStatus[myID] != _THREAD_COMPRESS_)
1446 continue;
1447 uint32_t numBytes = myself->compressBuffer(myID);
1448 myself->_threadStatus[myID] = _THREAD_WRITE_;
1449
1450 //wait for the previous data to be written
1451 while (myself->_threadIndex != threadToWaitForBeforeWriting)
1452 usleep(1000);
1453 //do the actual writing to disk
1454 pthread_mutex_lock(&(myself->_mutex));
1455 myself->writeCompressedDataToDisk(myID, numBytes);
1456 myself->_threadIndex = myID;
1457 pthread_mutex_unlock(&(myself->_mutex));
1458 myself->_threadStatus[myID] = _THREAD_WAIT_;
1459 }
1460 return NULL;
1461}
1462
1463/****************************************************************
1464 * PRINT USAGE
1465 ****************************************************************/
1466void printUsage()
1467{
1468 cout << endl;
1469 cout << "The FACT-Fits compressor reads an input Fits file from FACT"
1470 " and compresses it.\n It can use a drs calibration in order to"
1471 " improve the compression ratio. If so, the input calibration"
1472 " is embedded into the compressed file.\n"
1473 " By default, the Data column will be compressed using SMOOTHMAN (Thomas' algorithm)"
1474 " while other columns will be compressed with the AMPLITUDE coding (Veritas)"
1475 "Usage: Compressed_Fits_Test <inputFile>";
1476 cout << endl;
1477}
1478
1479/****************************************************************
1480 * PRINT HELP
1481 ****************************************************************/
1482void printHelp()
1483{
1484 cout << endl;
1485 cout << "The inputFile is required. It must have fits in its filename and the compressed file will be written in the same folder. "
1486 "The fz extension will be added, replacing the .gz one if required \n"
1487 "If output is specified, then it will replace the automatically generated output filename\n"
1488 "If --drs, followed by a drs calib then it will be applied to the data before compressing\n"
1489 "rowPerTile can be used to tune how many rows are in each tile. Default is 100\n"
1490 "threads gives the number of threads to use. Cannot be less than the default (1)\n"
1491 "compression explicitely gives the compression scheme to use for a given column. The syntax is:\n"
1492 "<ColumnName>=<CompressionScheme> with <CompressionScheme> one of the following:\n"
1493 "UNCOMPRESSED\n"
1494 "AMPLITUDE\n"
1495 "HUFFMAN\n"
1496 "SMOOTHMAN\n"
1497 "INT_WAVELET\n"
1498 "\n"
1499 "--quiet removes any textual output, except error messages\n"
1500 "--verify makes the compressor check the compressed data. It will read it back, and compare the reconstructed CHECKSUM and DATASUM with the original file values."
1501 ;
1502 cout << endl << endl;
1503}
1504
1505/****************************************************************
1506 * SETUP CONFIGURATION
1507 ****************************************************************/
1508void setupConfiguration(Configuration& conf)
1509{
1510 po::options_description configs("FitsCompressor options");
1511 configs.add_options()
1512 ("inputFile,i", vars<string>(), "Input file")
1513 ("drs,d", var<string>(), "Input drs calibration file")
1514 ("rowPerTile,r", var<unsigned int>(), "Number of rows per tile. Default is 100")
1515 ("output,o", var<string>(), "Output file. If empty, .fz is appened to the original name")
1516 ("threads,t", var<unsigned int>(), "Number of threads to use for compression")
1517 ("compression,c", vars<string>(), "which compression to use for which column. Syntax <colName>=<compressionScheme>")
1518 ("quiet,q", po_switch(), "Should the program display any text at all ?")
1519 ("verify,v", po_switch(), "Should we verify the data that has been compressed ?")
1520 ;
1521 po::positional_options_description positional;
1522 positional.add("inputFile", -1);
1523 conf.AddOptions(configs);
1524 conf.SetArgumentPositions(positional);
1525}
1526
1527/****************************************************************
1528 * MAIN
1529 ****************************************************************/
1530int main(int argc, const char** argv)
1531{
1532 Configuration conf(argv[0]);
1533 conf.SetPrintUsage(printUsage);
1534 setupConfiguration(conf);
1535
1536 if (!conf.DoParse(argc, argv, printHelp))
1537 return -1;
1538
1539 //initialize the file names to nothing.
1540 string fileNameIn = "";
1541 string fileNameOut = "";
1542 string drsFileName = "";
1543 uint32_t numRowsPerTile = 100;
1544 bool displayText=true;
1545
1546 //parse configuration
1547 if (conf.Get<bool>("quiet")) displayText = false;
1548 const vector<string> inputFileNameVec = conf.Vec<string>("inputFile");
1549 if (inputFileNameVec.size() != 1)
1550 {
1551 cout << "Error: ";
1552 if (inputFileNameVec.size() == 0) cout << "no";
1553 else cout << inputFileNameVec.size();
1554 cout << " input file(s) given. Expected one. Aborting. Input:" << endl;;
1555 for (unsigned int i=0;i<inputFileNameVec.size(); i++)
1556 cout << inputFileNameVec[i] << endl;
1557 return -1;
1558 }
1559
1560 //Assign the input filename
1561 fileNameIn = inputFileNameVec[0];
1562
1563 //Check if we have a drs calib too
1564 if (conf.Has("drs")) drsFileName = conf.Get<string>("drs");
1565
1566 //Should we verify the data ?
1567 bool verifyDataPlease = false;
1568 if (conf.Has("verify")) verifyDataPlease = conf.Get<bool>("verify");
1569
1570
1571 //should we use a specific output filename ?
1572 if (conf.Has("output"))
1573 fileNameOut = conf.Get<string>("output");
1574 else
1575 {
1576 size_t pos = fileNameIn.find(".fits");
1577 if (pos == string::npos)
1578 {
1579 cout << "ERROR: input file does not seems ot be fits. Aborting." << endl;
1580 return -1;
1581 }
1582 fileNameOut = fileNameIn.substr(0, pos) + ".fz";
1583 }
1584
1585 //should we use specific compression on some columns ?
1586 const vector<string> columnsCompression = conf.Vec<string>("compression");
1587
1588 //split up values between column names and compression scheme
1589 vector<std::pair<string, string>> compressions;
1590 for (unsigned int i=0;i<columnsCompression.size();i++)
1591 {
1592 size_t pos = columnsCompression[i].find_first_of("=");
1593 if (pos == string::npos)
1594 {
1595 cout << "ERROR: Something wrong occured while parsing " << columnsCompression[i] << ". Aborting." << endl;
1596 return -1;
1597 }
1598 string comp = columnsCompression[i].substr(pos+1);
1599 if (comp != "UNCOMPRESSED" && comp != "AMPLITUDE" && comp != "HUFFMAN" &&
1600 comp != "SMOOTHMAN" && comp != "INT_WAVELET")
1601 {
1602 cout << "Unkown compression scheme requested (" << comp << "). Aborting." << endl;
1603 return -1;
1604 }
1605 compressions.push_back(make_pair(columnsCompression[i].substr(0, pos), comp));
1606 }
1607
1608 //How many rows per tile should we use ?
1609 if (conf.Has("rowPerTile")) numRowsPerTile = conf.Get<unsigned int>("rowPerTile");
1610
1611 /************************************************************************************
1612 * Done reading configuration. Open relevant files
1613 ************************************************************************************/
1614
1615 //Open input's fits file
1616 factfits inFile(fileNameIn);
1617
1618 if (inFile.IsCompressedFITS())
1619 {
1620 cout << "ERROR: input file is already a compressed fits. Cannot be compressed again: Aborting." << endl;
1621 return -1;
1622 }
1623
1624 //decide how many tiles should be put in the compressed file
1625 uint32_t originalNumRows = inFile.GetNumRows();
1626 uint32_t numTiles = (originalNumRows%numRowsPerTile) ? (originalNumRows/numRowsPerTile)+1 : originalNumRows/numRowsPerTile;
1627 CompressedFitsWriter outFile(numTiles, numRowsPerTile);
1628
1629 //should we use a specific number of threads for compressing ?
1630 unsigned int numThreads = 1;
1631 if (conf.Has("threads"))
1632 {
1633 numThreads = conf.Get<unsigned int>("threads");
1634 outFile.setNumWorkingThreads(numThreads);
1635 }
1636
1637 if (!outFile.open(fileNameOut))
1638 {
1639 cout << "Error: could not open " << fileNameOut << " for writing" << endl;
1640 return -1;
1641 }
1642
1643 //Because the file to open MUST be given by the constructor, I must use a pointer instead
1644 factfits* drsFile = NULL;
1645 //try to open the Drs file. If any.
1646 if (drsFileName != "")
1647 {
1648 try
1649 {
1650 drsFile = new factfits(drsFileName);
1651 }
1652 catch (...)
1653 {
1654 cout << "Error: could not open " << drsFileName << " for calibration" << endl;
1655 return -1;
1656 }
1657 }
1658
1659 if (displayText)
1660 {
1661 cout << endl;
1662 cout << "**********************" << endl;
1663 cout << "Will compress from : " << fileNameIn << endl;
1664 cout << "to : " << fileNameOut << endl;
1665 if (drsFileName != "")
1666 cout << "while calibrating with: " << drsFileName << endl;
1667 cout << "Compression will use : " << numThreads << " worker threads" << endl;
1668 cout << "Data will be verified : ";
1669 if (verifyDataPlease)
1670 cout << "yes" << endl;
1671 else
1672 cout << "no (WARNING !)" << endl;
1673 cout << "**********************" << endl;
1674 cout << endl;
1675 }
1676
1677 /************************************************************************************
1678 * Done opening input files. Allocate memory and configure output file
1679 ************************************************************************************/
1680
1681 //allocate the buffer for temporary storage of each read/written row
1682 uint32_t rowWidth = inFile.GetUInt("NAXIS1");
1683 char* buffer = new char[rowWidth + 12];
1684 memset(buffer, 0, 4);
1685 buffer = buffer+4;
1686
1687 //get the source columns
1688 const fits::Table::Columns& columns = inFile.GetColumns();
1689 const fits::Table::SortedColumns& sortedColumns = inFile.GetSortedColumns();
1690 if (displayText)
1691 cout << "Input file has " << columns.size() << " columns and " << inFile.GetNumRows() << " rows" << endl;
1692
1693 //Add columns.
1694 uint32_t totalRowWidth = 0;
1695 for (uint32_t i=0;i<sortedColumns.size(); i++)
1696 {
1697 //get column name
1698 ostringstream str;
1699 str << "TTYPE" << i+1;
1700 string colName = inFile.GetStr(str.str());
1701 if (displayText)
1702 {
1703 cout << "Column " << colName;
1704 for (uint32_t j=colName.size();j<21;j++)
1705 cout << " ";
1706 cout << " -> ";
1707 }
1708
1709 //get header structures
1710 BlockHeader rawHeader;
1711 BlockHeader smoothmanHeader(0, zfits::kOrderByRow, 2);
1712 vector<uint16_t> rawProcessings(1);
1713 rawProcessings[0] = zfits::kFactRaw;
1714 vector<uint16_t> smoothmanProcessings(2);
1715 smoothmanProcessings[0] = zfits::kFactSmoothing;
1716 smoothmanProcessings[1] = zfits::kFactHuffman16;
1717// smoothmanProcessings[2] = FACT_RAW;
1718
1719 totalRowWidth += sortedColumns[i].bytes;
1720
1721 //first lets see if we do have an explicit request
1722 bool explicitRequest = false;
1723 for (unsigned int j=0;j<compressions.size();j++)
1724 {
1725 if (compressions[j].first == colName)
1726 {
1727 explicitRequest = true;
1728 if (displayText) cout << compressions[j].second << endl;
1729 if (compressions[j].second == "UNCOMPRESSED")
1730 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, rawHeader, rawProcessings));
1731 if (compressions[j].second == "SMOOTHMAN")
1732 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, smoothmanHeader, smoothmanProcessings));
1733 break;
1734 }
1735 }
1736
1737 if (explicitRequest) continue;
1738
1739 if (colName != "Data")
1740 {
1741 if (displayText) cout << "UNCOMPRESSED" << endl;
1742 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, rawHeader, rawProcessings));
1743 }
1744 else
1745 {
1746 if (displayText) cout << "SMOOTHMAN" << endl;
1747 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, smoothmanHeader, smoothmanProcessings));
1748 }
1749 }
1750
1751 //translate original header entries to their Z-version
1752 const fits::Table::Keys& header = inFile.GetKeys();
1753 for (fits::Table::Keys::const_iterator i=header.begin(); i!= header.end(); i++)
1754 {
1755 string k = i->first;//header[i].key();
1756 if (k == "XTENSION" || k == "BITPIX" || k == "PCOUNT" || k == "GCOUNT" || k == "TFIELDS")
1757 continue;
1758 if (k == "CHECKSUM")
1759 {
1760 outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("ZCHKSUM", i->second.value, i->second.comment));
1761 continue;
1762 }
1763 if (k == "DATASUM")
1764 {
1765 outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("ZDTASUM", i->second.value, i->second.comment));
1766 continue;
1767 }
1768 k = k.substr(0,5);
1769 if (k == "TTYPE" || k == "TFORM")
1770 {
1771 string tmpKey = i->second.fitsString;
1772 tmpKey[0] = 'Z';
1773 outFile.setHeaderKey(tmpKey);
1774 continue;
1775 }
1776 if (k == "NAXIS")
1777 continue;
1778 outFile.setHeaderKey(i->second.fitsString);
1779// cout << i->first << endl;
1780 }
1781
1782 outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("RAWSUM", " 0", "Checksum of raw littlen endian data"));
1783
1784 //deal with the DRS calib
1785 int16_t* drsCalib16 = NULL;
1786
1787 //load the drs calib. data
1788 int32_t startCellOffset = -1;
1789 if (drsFileName != "")
1790 {
1791 drsCalib16 = new int16_t[1440*1024];
1792 float* drsCalibFloat = NULL;
1793 try
1794 {
1795 drsCalibFloat = reinterpret_cast<float*>(drsFile->SetPtrAddress("BaselineMean"));
1796 }
1797 catch (...)
1798 {
1799 cout << "Could not find column BaselineMean in drs calibration file " << drsFileName << ". Aborting" << endl;
1800 return -1;
1801 }
1802
1803 //read the calibration and calculate its integer value
1804 drsFile->GetNextRow();
1805 for (uint32_t i=0;i<1440*1024;i++)
1806 drsCalib16[i] = (int16_t)(drsCalibFloat[i]*4096.f/2000.f);
1807
1808
1809 //get the start cells offsets
1810 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1811 if (it->first == "StartCellData")
1812 {
1813 startCellOffset = it->second.offset;
1814 if (it->second.type != 'I')
1815 {
1816 cout << "Wrong type for the StartCellData Column: " << it->second.type << " instead of I expected"<< endl;
1817 return -1;
1818 }
1819 }
1820
1821 if (startCellOffset == -1)
1822 {
1823 cout << "WARNING: Could not find StartCellData in input file " << fileNameIn << ". Doing it uncalibrated"<< endl;
1824 }
1825 else
1826 {
1827 //assign it to the ouput file
1828 outFile.setDrsCalib(drsCalib16);
1829 }
1830 }
1831
1832 /************************************************************************************
1833 * Done configuring compression. Do the real job now !
1834 ************************************************************************************/
1835
1836 if (displayText) cout << "Converting file..." << endl;
1837
1838 int numSlices = -1;
1839 int32_t dataOffset = -1;
1840
1841 //Get the pointer to the column that must be drs-calibrated
1842 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1843 if (it->first == "Data")
1844 {
1845 numSlices = it->second.num;
1846 dataOffset = it->second.offset;
1847 }
1848 if (numSlices % 1440 != 0)
1849 {
1850 cout << "seems like the number of samples is not a multiple of 1440. Aborting." << endl;
1851 return -1;
1852 }
1853 if (dataOffset == -1)
1854 {
1855 cout << "Could not find the column Data in the input file. Aborting." << endl;
1856 return -1;
1857 }
1858
1859 numSlices /= 1440;
1860
1861 //set pointers to the readout data to later be able to gather it to "buffer".
1862 vector<void*> readPointers;
1863 vector<int32_t> readOffsets;
1864 vector<int32_t> readElemSize;
1865 vector<int32_t> readNumElems;
1866 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1867 {
1868 readPointers.push_back(inFile.SetPtrAddress(it->first));
1869 readOffsets.push_back(it->second.offset);
1870 readElemSize.push_back(it->second.size);
1871 readNumElems.push_back(it->second.num);
1872 }
1873
1874 Checksum rawsum;
1875 //Convert each row one after the other
1876 ostringstream wrongChannels;
1877 map<int, int> wrongChannelsMap;
1878 for (uint32_t i=0;i<1440;i++)
1879 wrongChannelsMap[i] = 0;
1880 for (uint32_t i=0;i<inFile.GetNumRows();i++)
1881 {
1882 if (displayText) cout << "\r Row " << i+1 << flush;
1883 inFile.GetNextRow();
1884 //copy from inFile internal structures to buffer
1885 int32_t count=0;
1886 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end();it++)
1887 {
1888 memcpy(&buffer[readOffsets[count]], readPointers[count], readElemSize[count]*readNumElems[count]);
1889 count++;
1890 }
1891
1892 char* checkSumPointer = buffer;
1893 const int chkOffset = (i*totalRowWidth)%4;
1894 checkSumPointer -= chkOffset;
1895 uint32_t sizeToChecksum = totalRowWidth + chkOffset;
1896 if (sizeToChecksum%4 != 0)
1897 {
1898 int32_t extraBytes = 4 - (sizeToChecksum%4);
1899 memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
1900 sizeToChecksum += extraBytes;
1901 }
1902 rawsum.add(checkSumPointer, sizeToChecksum, false);
1903
1904 if (startCellOffset != -1)
1905 {//drs calibrate this data
1906 for (int j=0;j<1440;j++)
1907 {
1908 const int thisStartCell = reinterpret_cast<int16_t*>(&buffer[startCellOffset])[j];
1909 if (thisStartCell > 1023)
1910 {
1911 wrongChannelsMap[j]++;
1912 wrongChannels << j;
1913 }
1914 if (thisStartCell < 0) continue;
1915 for (int k=0;k<numSlices;k++)
1916 reinterpret_cast<int16_t*>(&buffer[dataOffset])[numSlices*j + k] -= drsCalib16[1024*j + (thisStartCell+k)%1024];
1917 }
1918 }
1919 outFile.writeBinaryRow(buffer);
1920 };
1921
1922 if (wrongChannels.str() != "")
1923 {
1924 cout << "ERROR: Wrong channels: ";
1925 for (uint32_t i=0;i<1440;i++)
1926 {
1927 if (wrongChannelsMap[i] != 0)
1928 cout << i << "(" << wrongChannelsMap[i] << ") ";
1929 }
1930 cout << endl;
1931 exit(-1);
1932 }
1933 ostringstream strSum;
1934 strSum << rawsum.val();
1935 outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("RAWSUM", strSum.str(), "Checksum of raw littlen endian data"));
1936
1937 //Get table name for later use in case the compressed file is to be verified
1938 string tableName = inFile.GetStr("EXTNAME");
1939
1940 if (displayText) cout << endl << "Done. Flushing output file..." << endl;
1941
1942 inFile.close();
1943 if (!outFile.close())
1944 {
1945 cout << "Something went wrong while writing the catalog: negative index" << endl;
1946 return false;
1947 }
1948
1949 if (drsCalib16 != NULL)
1950 delete[] drsCalib16;
1951
1952 if (displayText) cout << "Done." << endl;
1953
1954 /************************************************************************************
1955 * Actual job done. Should we verify what we did ?
1956 ************************************************************************************/
1957
1958 if (verifyDataPlease)
1959 {
1960 if (displayText) cout << "Now verify data..." << endl;
1961 }
1962 else
1963 return 0;
1964
1965 //get a compressed reader
1966//TEMP try to copy the file too
1967// string copyName("/gpfs/scratch/fact/etienne/copyFile.fz");
1968// string copyName(fileNameOut+".copy");
1969 string copyName("");
1970 factfits verifyFile(fileNameOut, copyName, tableName, false);
1971
1972 //and the header of the compressed file
1973 const fits::Table::Keys& header2 = verifyFile.GetKeys();
1974
1975 //get a non-compressed writer
1976 ofits reconstructedFile;
1977
1978 //figure out its name: /dev/null unless otherwise specified
1979 string reconstructedName = fileNameOut+".recons";
1980 reconstructedName = "/dev/null";
1981 reconstructedFile.open(reconstructedName.c_str(), false);
1982
1983 //reconstruct the original columns from the compressed file.
1984 string origChecksumStr;
1985 string origDatasum;
1986
1987 //reset tablename value so that it is re-read from compressed table's header
1988 tableName = "";
1989
1990 /************************************************************************************
1991 * Reconstruction setup done. Rebuild original header
1992 ************************************************************************************/
1993
1994 //re-tranlate the keys
1995 for (fits::Table::Keys::const_iterator it=header2.begin(); it!= header2.end(); it++)
1996 {
1997 string k = it->first;
1998 if (k == "XTENSION" || k == "BITPIX" || k == "PCOUNT" || k == "GCOUNT" ||
1999 k == "TFIELDS" || k == "ZTABLE" || k == "ZNAXIS1" || k == "ZNAXIS2" ||
2000 k == "ZHEAPPTR" || k == "ZPCOUNT" || k == "ZTILELEN" || k == "THEAP" ||
2001 k == "CHECKSUM" || k == "DATASUM" || k == "FCTCPVER" || k == "ZHEAP")
2002 {
2003 continue;
2004 }
2005
2006 if (k == "ZCHKSUM")
2007 {
2008 reconstructedFile.SetKeyComment("CHECKSUM", it->second.comment);
2009 origChecksumStr = it->second.value;
2010 continue;
2011 }
2012 if (k == "RAWSUM")
2013 {
2014 continue;
2015 }
2016
2017 if (k == "ZDTASUM")
2018 {
2019 reconstructedFile.SetKeyComment("DATASUM", it->second.comment);
2020 origDatasum = it->second.value;
2021 continue;
2022 }
2023
2024 if (k == "EXTNAME")
2025 {
2026 tableName = it->second.value;
2027 }
2028
2029 k = k.substr(0,5);
2030
2031 if (k == "TTYPE")
2032 {//we have an original column name here.
2033 //manually deal with these in order to preserve the ordering (easier than re-constructing yet another list on the fly)
2034 continue;
2035 }
2036
2037 if (k == "TFORM" || k == "NAXIS" || k == "ZCTYP" )
2038 {
2039 continue;
2040 }
2041
2042 if (k == "ZFORM" || k == "ZTYPE")
2043 {
2044 string tmpKey = it->second.fitsString;
2045 tmpKey[0] = 'T';
2046 reconstructedFile.SetKeyFromFitsString(tmpKey);
2047 continue;
2048 }
2049
2050 reconstructedFile.SetKeyFromFitsString(it->second.fitsString);
2051 }
2052
2053 if (tableName == "")
2054 {
2055 cout << "Error: table name from file " << fileNameOut << " could not be found. Aborting" << endl;
2056 return -1;
2057 }
2058
2059 //Restore the original columns
2060 for (uint32_t numCol=1; numCol<10000; numCol++)
2061 {
2062 ostringstream str;
2063 str << numCol;
2064 if (!verifyFile.HasKey("TTYPE"+str.str())) break;
2065
2066 string ttype = verifyFile.GetStr("TTYPE"+str.str());
2067 string tform = verifyFile.GetStr("ZFORM"+str.str());
2068 char type = tform[tform.size()-1];
2069 string number = tform.substr(0, tform.size()-1);
2070 int numElems = atoi(number.c_str());
2071
2072 if (number == "") numElems=1;
2073
2074 reconstructedFile.AddColumn(numElems, type, ttype, "", "", false);
2075 }
2076
2077 reconstructedFile.WriteTableHeader(tableName.c_str());
2078
2079 /************************************************************************************
2080 * Original header restored. Do the data
2081 ************************************************************************************/
2082
2083 //set pointers to the readout data to later be able to gather it to "buffer".
2084 readPointers.clear();
2085 readOffsets.clear();
2086 readElemSize.clear();
2087 readNumElems.clear();
2088 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
2089 {
2090 readPointers.push_back(verifyFile.SetPtrAddress(it->first));
2091 readOffsets.push_back(it->second.offset);
2092 readElemSize.push_back(it->second.size);
2093 readNumElems.push_back(it->second.num);
2094 }
2095
2096 //do the actual reconstruction work
2097 uint32_t i=1;
2098 while (i<=verifyFile.GetNumRows() && verifyFile.GetNextRow())
2099 {
2100 int count=0;
2101 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end();it++)
2102 {
2103 memcpy(&buffer[readOffsets[count]], readPointers[count], readElemSize[count]*readNumElems[count]);
2104 count++;
2105 }
2106 if (displayText) cout << "\r Row " << i << flush;
2107 reconstructedFile.WriteRow(buffer, rowWidth);
2108 i++;
2109 }
2110
2111 if (displayText) cout << endl;
2112
2113 //close reconstruction input and output
2114// Do NOT close the verify file, otherwise data cannot be flushed to copy file
2115// verifyFile.close();
2116 if (!verifyFile.IsFileOk())
2117 cout << "ERROR: file checksums seems wrong" << endl;
2118
2119 reconstructedFile.close();
2120
2121 //get original and reconstructed checksum and datasum
2122 std::pair<string, int> origChecksum = make_pair(origChecksumStr, atoi(origDatasum.c_str()));
2123 std::pair<string, int> newChecksum = reconstructedFile.GetChecksumData();
2124
2125 //verify that no mistake was made
2126 if (origChecksum.second != newChecksum.second)
2127 {
2128 cout << "ERROR: datasums are NOT identical: " << (uint32_t)(origChecksum.second) << " vs " << (uint32_t)(newChecksum.second) << endl;
2129 return -1;
2130 }
2131 if (origChecksum.first != newChecksum.first)
2132 {
2133 cout << "WARNING: checksums are NOT Identical: " << origChecksum.first << " vs " << newChecksum.first << endl;
2134 }
2135 else
2136 {
2137 if (true) cout << "Ok" << endl;
2138 }
2139
2140 buffer = buffer-4;
2141 delete[] buffer;
2142 return 0;
2143}
2144
Note: See TracBrowser for help on using the repository browser.