source: trunk/FACT++/src/fitsCompressor.cc@ 16936

Last change on this file since 16936 was 16898, checked in by lyard, 11 years ago
Propagated the new naming of enums to other files....
File size: 86.5 KB
Line 
1/*
2 * fitsCompressor.cc
3 *
4 * Created on: May 7, 2013
5 * Author: lyard
6 */
7
8
9#include "Configuration.h"
10#include "../externals/factfits.h"
11#include "../externals/ofits.h"
12#include "../externals/checksum.h"
13
14#include <map>
15#include <fstream>
16#include <sstream>
17#include <iostream>
18
19
20using namespace std;
21
22typedef struct TileHeader
23{
24 char id[4];
25 uint32_t numRows;
26 uint64_t size;
27 TileHeader(uint32_t nRows=0,
28 uint64_t s=0) : id({'T', 'I', 'L', 'E'}),
29 numRows(nRows),
30 size(s)
31 { };
32} __attribute__((__packed__)) TileHeader;
33
34typedef struct BlockHeader
35{
36 uint64_t size;
37 char ordering;
38 unsigned char numProcs;
39 BlockHeader(uint64_t s=0,
40 char o=zfits::kOrderByRow,
41 unsigned char n=1) : size(s),
42 ordering(o),
43 numProcs(n)
44 {}
45} __attribute__((__packed__)) BlockHeader;
46
47class CompressedFitsFile
48{
49public:
50 class HeaderEntry
51 {
52 public:
53 /**
54 * Default constructor
55 */
56 HeaderEntry(): _key(""),
57 _value(""),
58 _comment(""),
59 _fitsString("")
60 {
61 }
62
63 /**
64 * Regular constructor.
65 * @param key the name of the keyword entry
66 * @param value its value
67 * @param comment an optionnal comment to be placed after the value
68 */
69 template<typename T>
70 HeaderEntry(const string& k,
71 const T& val,
72 const string& comm) : _key(k),
73 _value(""),
74 _comment(comm),
75 _fitsString("")
76 {
77 setValue(val);
78 }
79
80 /**
81 * From fits.h
82 */
83 string Trim(const string &str, char c=' ')
84 {
85 // Trim Both leading and trailing spaces
86 const size_t pstart = str.find_first_not_of(c); // Find the first character position after excluding leading blank spaces
87 const size_t pend = str.find_last_not_of(c); // Find the first character position from reverse af
88
89 // if all spaces or empty return an empty string
90 if (string::npos==pstart || string::npos==pend)
91 return string();
92
93 return str.substr(pstart, pend-pstart+1);
94 }
95 /**
96 * Constructor from the original fits entry
97 */
98 HeaderEntry(const string& line)
99 {
100 _fitsString = line.data();
101
102 //parse the line
103 _key = Trim(line.substr(0,8));
104 //COMMENT and/or HISTORY values
105 if (line.substr(8,2)!= "= ")
106 {
107 _value = "";
108 _comment = Trim(line.substr(10));
109 return;
110 }
111 string next=line.substr(10);
112 const size_t slash = next.find_first_of('/');
113 _value = Trim(Trim(Trim(next.substr(0, slash)), '\''));
114 _comment = Trim(next.substr(slash+1));
115 }
116 /**
117 * Alternative constroctor from the fits entry
118 */
119 HeaderEntry(const vector<char>& lineVec)
120 {
121 HeaderEntry(string(lineVec.data()));
122 }
123 /**
124 * Destructor
125 */
126 virtual ~HeaderEntry(){}
127
128 const string& value() const {return _value;}
129 const string& key() const {return _key;}
130 const string& comment() const {return _comment;}
131 const string& fitsString() const {return _fitsString;}
132
133 /**
134 * Set a keyword value.
135 * @param value The value to be set
136 * @param update whether the value already exist or not. To be modified soon.
137 */
138 template<typename T>
139 void setValue(const T& val)
140 {
141 ostringstream str;
142 str << val;
143 _value = str.str();
144 buildFitsString();
145 };
146 /**
147 * Set the comment for a given entry
148 */
149 void setComment(const string& comm)
150 {
151 _comment = comm;
152 buildFitsString();
153 }
154
155 private:
156 /**
157 * Construct the FITS header string from the key, value and comment
158 */
159 void buildFitsString()
160 {
161 ostringstream str;
162 unsigned int totSize = 0;
163
164 // Tuncate the key if required
165 if (_key.length() > 8)
166 {
167 str << _key.substr(0, 8);
168 totSize += 8;
169 }
170 else
171 {
172 str << _key;
173 totSize += _key.length();
174 }
175
176 // Append space if key is less than 8 chars long
177 for (int i=totSize; i<8;i++)
178 {
179 str << " ";
180 totSize++;
181 }
182
183 // Add separator
184 str << "= ";
185 totSize += 2;
186
187 // Format value
188 if (_value.length() < 20)
189 for (;totSize<30-_value.length();totSize++)
190 str << " ";
191
192 if (_value.length() > 70)
193 {
194 str << _value.substr(0,70);
195 totSize += 70;
196 }
197 else
198 {
199 str << _value;
200 totSize += _value.size();
201 }
202
203 // If there is space remaining, add comment area
204 if (totSize < 77)
205 {
206 str << " / ";
207 totSize += 3;
208 if (totSize < 80)
209 {
210 unsigned int commentSize = 80 - totSize;
211 if (_comment.length() > commentSize)
212 {
213 str << _comment.substr(0,commentSize);
214 totSize += commentSize;
215 }
216 else
217 {
218 str << _comment;
219 totSize += _comment.length();
220 }
221 }
222 }
223
224 // If there is yet some free space, fill up the entry with spaces
225 for (int i=totSize; i<80;i++)
226 str << " ";
227
228 _fitsString = str.str();
229
230 // Check for correct completion
231 if (_fitsString.length() != 80)
232 cout << "Error |" << _fitsString << "| is not of length 80" << endl;
233 }
234
235 string _key; ///< the key (name) of the header entry
236 string _value; ///< the value of the header entry
237 string _comment; ///< the comment associated to the header entry
238 string _fitsString; ///< the string that will be written to the fits file
239 };
240
241 /**
242 * Supported compressions
243 */
244 typedef enum
245 {
246 UNCOMPRESSED,
247 SMOOTHMAN
248 } FitsCompression;
249
250 /**
251 * Columns class
252 */
253 class ColumnEntry
254 {
255 public:
256 /**
257 * Default constructor
258 */
259 ColumnEntry();
260
261 /**
262 * Default destructor
263 */
264 virtual ~ColumnEntry(){}
265
266 /**
267 * Constructor from values
268 * @param n the column name
269 * @param t the column type
270 * @param numof the number of entries in the column
271 * @param comp the compression type for this column
272 */
273 ColumnEntry(const string& n,
274 char t,
275 int numOf,
276 BlockHeader& head,
277 vector<uint16_t>& seq) : _name(n),
278 _num(numOf),
279 _typeSize(0),
280 _offset(0),
281 _type(t),
282 _description(""),
283 _header(head),
284 _compSequence(seq)
285 {
286 switch (t)
287 {
288 case 'L':
289 case 'A':
290 case 'B': _typeSize = 1; break;
291 case 'I': _typeSize = 2; break;
292 case 'J':
293 case 'E': _typeSize = 4; break;
294 case 'K':
295 case 'D': _typeSize = 8; break;
296 default:
297 cout << "Error: typename " << t << " missing in the current implementation" << endl;
298 };
299
300 ostringstream str;
301 str << "data format of field: ";
302
303 switch (t)
304 {
305 case 'L': str << "1-byte BOOL"; break;
306 case 'A': str << "1-byte CHAR"; break;
307 case 'B': str << "BYTE"; break;
308 case 'I': str << "2-byte INTEGER"; break;
309 case 'J': str << "4-byte INTEGER"; break;
310 case 'K': str << "8-byte INTEGER"; break;
311 case 'E': str << "4-byte FLOAT"; break;
312 case 'D': str << "8-byte FLOAT"; break;
313 }
314
315 _description = str.str();
316 }
317
318 const string& name() const { return _name;};
319 int width() const { return _num*_typeSize;};
320 int offset() const { return _offset; };
321 int numElems() const { return _num; };
322 int sizeOfElems() const { return _typeSize;};
323 void setOffset(int off) { _offset = off;};
324 char type() const { return _type;};
325 string getDescription() const { return _description;}
326 BlockHeader& getBlockHeader() { return _header;}
327 const vector<uint16_t>& getCompressionSequence() const { return _compSequence;}
328 const char& getColumnOrdering() const { return _header.ordering;}
329
330
331 string getCompressionString() const
332 {
333 return "FACT";
334 /*
335 ostringstream str;
336 for (uint32_t i=0;i<_compSequence.size();i++)
337 switch (_compSequence[i])
338 {
339 case FACT_RAW: if (str.str().size() == 0) str << "RAW"; break;
340 case FACT_SMOOTHING: str << "SMOOTHING "; break;
341 case FACT_HUFFMAN16: str << "HUFFMAN16 "; break;
342 };
343 return str.str();*/
344 }
345
346 private:
347
348 string _name; ///< name of the column
349 int _num; ///< number of elements contained in one row of this column
350 int _typeSize; ///< the number of bytes taken by one element
351 int _offset; ///< the offset of the column, in bytes, from the beginning of one row
352 char _type; ///< the type of the column, as specified by the fits documentation
353 string _description; ///< a description for the column. It will be placed in the header
354 BlockHeader _header;
355 vector<uint16_t> _compSequence;
356 };
357
358 public:
359 ///@brief default constructor. Assigns a default number of rows and tiles
360 CompressedFitsFile(uint32_t numTiles=100, uint32_t numRowsPerTile=100);
361
362 ///@brief default destructor
363 virtual ~CompressedFitsFile();
364
365 ///@brief get the header of the file
366 vector<HeaderEntry>& getHeaderEntries() { return _header;}
367
368 protected:
369 ///@brief protected function to allocate the intermediate buffers
370 bool reallocateBuffers();
371
372 //FITS related stuff
373 vector<HeaderEntry> _header; ///< Header keys
374 vector<ColumnEntry> _columns; ///< Columns in the file
375 uint32_t _numTiles; ///< Number of tiles (i.e. groups of rows)
376 uint32_t _numRowsPerTile; ///< Number of rows per tile
377 uint32_t _totalNumRows; ///< Total number of raws
378 uint32_t _rowWidth; ///< Total number of bytes in one row
379 bool _headerFlushed; ///< Flag telling whether the header record is synchronized with the data on disk
380 char* _buffer; ///< Memory buffer to store rows while they are not compressed
381 Checksum _checksum; ///< Checksum for asserting the consistency of the data
382 fstream _file; ///< The actual file streamer for accessing disk data
383
384 //compression related stuff
385 typedef pair<int64_t, int64_t> CatalogEntry;
386 typedef vector<CatalogEntry> CatalogRow;
387 typedef vector<CatalogRow> CatalogType;
388 CatalogType _catalog; ///< Catalog, i.e. the main table that points to the compressed data.
389 uint64_t _heapPtr; ///< the address in the file of the heap area
390 vector<char*> _transposedBuffer; ///< Memory buffer to store rows while they are transposed
391 vector<char*> _compressedBuffer; ///< Memory buffer to store rows while they are compressed
392
393 //thread related stuff
394 uint32_t _numThreads; ///< The number of threads that will be used to compress
395 uint32_t _threadIndex; ///< A variable to assign threads indices
396 vector<pthread_t> _thread; ///< The thread handler of the compressor
397 vector<uint32_t> _threadNumRows; ///< Total number of rows for thread to compress
398 vector<uint32_t> _threadStatus; ///< Flag telling whether the buffer to be transposed (and compressed) is full or empty
399
400 //thread states. Not all used, but they do not hurt
401 static const uint32_t _THREAD_WAIT_; ///< Thread doing nothing
402 static const uint32_t _THREAD_COMPRESS_; ///< Thread working, compressing
403 static const uint32_t _THREAD_DECOMPRESS_; ///< Thread working, decompressing
404 static const uint32_t _THREAD_WRITE_; ///< Thread writing data to disk
405 static const uint32_t _THREAD_READ_; ///< Thread reading data from disk
406 static const uint32_t _THREAD_EXIT_; ///< Thread exiting
407
408 static HeaderEntry _dummyHeaderEntry; ///< Dummy entry for returning if requested on is not found
409};
410
411class CompressedFitsWriter : public CompressedFitsFile
412{
413 public:
414 ///@brief Default constructor. 100 tiles of 100 rows each are assigned by default
415 CompressedFitsWriter(uint32_t numTiles=100, uint32_t numRowsPerTile=100);
416
417 ///@brief default destructor
418 virtual ~CompressedFitsWriter();
419
420 ///@brief add one column to the file
421 bool addColumn(const ColumnEntry& column);
422
423 ///@brief sets a given header key
424 bool setHeaderKey(const HeaderEntry&);
425
426 ///@brief open a new fits file
427 bool open(const string& fileName, const string& tableName="Data");
428
429 ///@brief close the opened file
430 bool close();
431
432 ///@brief write one row of data, already placed in bufferToWrite. Does the byte-swapping
433 bool writeBinaryRow(const char* bufferToWrite);
434
435 uint32_t getRowWidth();
436
437 ///@brief assign a given (already loaded) drs calibration
438 void setDrsCalib(int16_t* data);
439
440 ///@brief set the number of worker threads compressing the data.
441 bool setNumWorkingThreads(uint32_t num);
442
443 private:
444 ///@brief compresses one buffer of data, the one given by threadIndex
445 uint64_t compressBuffer(uint32_t threadIndex);
446
447 ///@brief writes an already compressed buffer to disk
448 bool writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite);
449
450 ///@brief add the header checksum to the datasum
451 void addHeaderChecksum(Checksum& checksum);
452
453 ///@brief write the header. If closingFile is set to true, checksum is calculated
454 void writeHeader(bool closingFile = false);
455
456 ///@brief write the compressed data catalog. If closingFile is set to true, checksum is calculated
457 void writeCatalog(bool closingFile=false);
458
459 /// FIXME this was a bad idea. Move everything to the regular header
460 vector<HeaderEntry> _defaultHeader;
461
462 /// the main function compressing the data
463 static void* threadFunction(void* context);
464
465 /// Write the drs calibration to disk, if any
466 void writeDrsCalib();
467
468 /// Copy and transpose (or semi-transpose) one tile of data
469 void copyTransposeTile(uint32_t index);
470
471 /// Specific compression functions
472 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
473 uint32_t compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
474 uint32_t compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
475 uint32_t applySMOOTHING(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
476
477 int32_t _checkOffset; ///< offset to the data pointer to calculate the checksum
478 int16_t* _drsCalibData; ///< array of the Drs baseline mean
479 int32_t _threadLooper; ///< Which thread will deal with the upcoming bunch of data ?
480 pthread_mutex_t _mutex; ///< mutex for compressing threads
481
482
483 static string _emptyBlock; ///< an empty block to be apened at the end of a file so that its length is a multiple of 2880
484 static string _fitsHeader; ///< the default header to be written in every fits file
485};
486
487const uint32_t CompressedFitsFile::_THREAD_WAIT_ = 0;
488const uint32_t CompressedFitsFile::_THREAD_COMPRESS_ = 1;
489const uint32_t CompressedFitsFile::_THREAD_DECOMPRESS_= 2;
490const uint32_t CompressedFitsFile::_THREAD_WRITE_ = 3;
491const uint32_t CompressedFitsFile::_THREAD_READ_ = 4;
492const uint32_t CompressedFitsFile::_THREAD_EXIT_ = 5;
493
494template<>
495void CompressedFitsFile::HeaderEntry::setValue(const string& v)
496{
497 string val = v;
498 if (val.size() > 2 && val[0] == '\'')
499 {
500 size_t pos = val.find_last_of("'");
501 if (pos != string::npos && pos != 0)
502 val = val.substr(1, pos-1);
503 }
504 ostringstream str;
505
506 str << "'" << val << "'";
507 for (int i=str.str().length(); i<20;i++)
508 str << " ";
509 _value = str.str();
510 buildFitsString();
511}
512
513/**
514 * Default header to be written in all fits files
515 */
516string CompressedFitsWriter::_fitsHeader = "SIMPLE = T / file does conform to FITS standard "
517 "BITPIX = 8 / number of bits per data pixel "
518 "NAXIS = 0 / number of data axes "
519 "EXTEND = T / FITS dataset may contain extensions "
520 "CHECKSUM= '4AcB48bA4AbA45bA' / Checksum for the whole HDU "
521 "DATASUM = ' 0' / Checksum for the data block "
522 "COMMENT FITS (Flexible Image Transport System) format is defined in 'Astronomy"
523 "COMMENT and Astrophysics', volume 376, page 359; bibcode: 2001A&A...376..359H "
524 "END "
525 " "
526 " "
527 " "
528 " "
529 " "
530 " "
531 " "
532 " "
533 " "
534 " "
535 " "
536 " "
537 " "
538 " "
539 " "
540 " "
541 " "
542 " "
543 " "
544 " "
545 " "
546 " "
547 " "
548 " "
549 " "
550 " "
551 " ";
552
553/**
554 * Empty block to be appenned at the end of files, so that the length matches multiple of 2880 bytes
555 *
556 */
557string CompressedFitsWriter::_emptyBlock = " "
558 " "
559 " "
560 " "
561 " "
562 " "
563 " "
564 " "
565 " "
566 " "
567 " "
568 " "
569 " "
570 " "
571 " "
572 " "
573 " "
574 " "
575 " "
576 " "
577 " "
578 " "
579 " "
580 " "
581 " "
582 " "
583 " "
584 " "
585 " "
586 " "
587 " "
588 " "
589 " "
590 " "
591 " "
592 " ";
593
594CompressedFitsFile::HeaderEntry CompressedFitsFile::_dummyHeaderEntry;
595/****************************************************************
596 * SUPER CLASS DEFAULT CONSTRUCTOR
597 ****************************************************************/
598CompressedFitsFile::CompressedFitsFile(uint32_t numTiles,
599 uint32_t numRowsPerTile) : _header(),
600 _columns(),
601 _numTiles(numTiles),
602 _numRowsPerTile(numRowsPerTile),
603 _totalNumRows(0),
604 _rowWidth(0),
605 _headerFlushed(false),
606 _buffer(NULL),
607 _checksum(0),
608 _heapPtr(0),
609 _transposedBuffer(1),
610 _compressedBuffer(1),
611 _numThreads(1),
612 _threadIndex(0),
613 _thread(1),
614 _threadNumRows(1),
615 _threadStatus(1)
616{
617 _catalog.resize(_numTiles);
618 _transposedBuffer[0] = NULL;
619 _compressedBuffer[0] = NULL;
620 _threadStatus[0] = _THREAD_WAIT_;
621 _threadNumRows[0] = 0;
622}
623
624/****************************************************************
625 * SUPER CLASS DEFAULT DESTRUCTOR
626 ****************************************************************/
627CompressedFitsFile::~CompressedFitsFile()
628{
629 if (_buffer != NULL)
630 {
631 _buffer = _buffer-4;
632 delete[] _buffer;
633 _buffer = NULL;
634 for (uint32_t i=0;i<_numThreads;i++)
635 {
636 _compressedBuffer[i] = _compressedBuffer[i]-4;
637 delete[] _transposedBuffer[i];
638 delete[] _compressedBuffer[i];
639 _transposedBuffer[i] = NULL;
640 _compressedBuffer[i] = NULL;
641 }
642 }
643 if (_file.is_open())
644 _file.close();
645}
646
647/****************************************************************
648 * REALLOCATE BUFFER
649 ****************************************************************/
650bool CompressedFitsFile::reallocateBuffers()
651{
652 if (_buffer)
653 {
654 _buffer = _buffer - 4;
655 delete[] _buffer;
656 for (uint32_t i=0;i<_compressedBuffer.size();i++)
657 {
658 _compressedBuffer[i] = _compressedBuffer[i]-4;
659 delete[] _transposedBuffer[i];
660 delete[] _compressedBuffer[i];
661 }
662 }
663 _buffer = new char[_rowWidth*_numRowsPerTile + 12];
664 if (_buffer == NULL) return false;
665 memset(_buffer, 0, 4);
666 _buffer = _buffer + 4;
667 if (_compressedBuffer.size() != _numThreads)
668 {
669 _transposedBuffer.resize(_numThreads);
670 _compressedBuffer.resize(_numThreads);
671 }
672 for (uint32_t i=0;i<_numThreads;i++)
673 {
674 _transposedBuffer[i] = new char[_rowWidth*_numRowsPerTile];
675 _compressedBuffer[i] = new char[_rowWidth*_numRowsPerTile + _columns.size() + sizeof(TileHeader) + 12]; //use a bit more memory for compression flags and checksumming
676 if (_transposedBuffer[i] == NULL || _compressedBuffer[i] == NULL)
677 return false;
678 //shift the compressed buffer by 4 bytes, for checksum calculation
679 memset(_compressedBuffer[i], 0, 4);
680 _compressedBuffer[i] = _compressedBuffer[i]+4;
681 //initialize the tile header
682 TileHeader tileHeader;
683 memcpy(_compressedBuffer[i], &tileHeader, sizeof(TileHeader));
684 }
685 return true;
686}
687
688/****************************************************************
689 * DEFAULT WRITER CONSTRUCTOR
690 ****************************************************************/
691CompressedFitsWriter::CompressedFitsWriter(uint32_t numTiles,
692 uint32_t numRowsPerTile) : CompressedFitsFile(numTiles, numRowsPerTile),
693 _checkOffset(0),
694 _drsCalibData(NULL),
695 _threadLooper(0)
696{
697 _defaultHeader.push_back(HeaderEntry("XTENSION", "'BINTABLE' ", "binary table extension"));
698 _defaultHeader.push_back(HeaderEntry("BITPIX", 8, "8-bit bytes"));
699 _defaultHeader.push_back(HeaderEntry("NAXIS", 2, "2-dimensional binary table"));
700 _defaultHeader.push_back(HeaderEntry("NAXIS1", _rowWidth, "width of table in bytes"));
701 _defaultHeader.push_back(HeaderEntry("NAXIS2", numTiles, "num of rows in table"));
702 _defaultHeader.push_back(HeaderEntry("PCOUNT", 0, "size of special data area"));
703 _defaultHeader.push_back(HeaderEntry("GCOUNT", 1, "one data group (required keyword)"));
704 _defaultHeader.push_back(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
705 _defaultHeader.push_back(HeaderEntry("CHECKSUM", "'0000000000000000' ", "Checksum for the whole HDU"));
706 _defaultHeader.push_back(HeaderEntry("DATASUM", " 0", "Checksum for the data block"));
707 //compression stuff
708 _defaultHeader.push_back(HeaderEntry("ZTABLE", "T", "Table is compressed"));
709 _defaultHeader.push_back(HeaderEntry("ZNAXIS1", 0, "Width of uncompressed rows"));
710 _defaultHeader.push_back(HeaderEntry("ZNAXIS2", 0, "Number of uncompressed rows"));
711 _defaultHeader.push_back(HeaderEntry("ZPCOUNT", 0, ""));
712 _defaultHeader.push_back(HeaderEntry("ZHEAPPTR", 0, ""));
713 _defaultHeader.push_back(HeaderEntry("ZTILELEN", numRowsPerTile, "Number of rows per tile"));
714 _defaultHeader.push_back(HeaderEntry("THEAP", 0, ""));
715
716 pthread_mutex_init(&_mutex, NULL);
717}
718
719/****************************************************************
720 * DEFAULT DESTRUCTOR
721 ****************************************************************/
722CompressedFitsWriter::~CompressedFitsWriter()
723{
724 pthread_mutex_destroy(&_mutex);
725}
726
727/****************************************************************
728 * SET THE POINTER TO THE DRS CALIBRATION
729 ****************************************************************/
730void CompressedFitsWriter::setDrsCalib(int16_t* data)
731{
732 _drsCalibData = data;
733}
734
735/****************************************************************
736 * SET NUM WORKING THREADS
737 ****************************************************************/
738bool CompressedFitsWriter::setNumWorkingThreads(uint32_t num)
739{
740 if (_file.is_open())
741 return false;
742 if (num < 1 || num > 64)
743 {
744 cout << "ERROR: num threads must be between 1 and 64. Ignoring" << endl;
745 return false;
746 }
747 _numThreads = num;
748 _transposedBuffer[0] = NULL;
749 _compressedBuffer[0] = NULL;
750 _threadStatus.resize(num);
751 _thread.resize(num);
752 _threadNumRows.resize(num);
753 for (uint32_t i=0;i<num;i++)
754 {
755 _threadNumRows[i] = 0;
756 _threadStatus[i] = _THREAD_WAIT_;
757 }
758 return reallocateBuffers();
759}
760
761/****************************************************************
762 * WRITE DRS CALIBRATION TO FILE
763 ****************************************************************/
764void CompressedFitsWriter::writeDrsCalib()
765{
766 //if file was not loaded, ignore
767 if (_drsCalibData == NULL)
768 return;
769 uint64_t whereDidIStart = _file.tellp();
770 vector<HeaderEntry> header;
771 header.push_back(HeaderEntry("XTENSION", "'BINTABLE' ", "binary table extension"));
772 header.push_back(HeaderEntry("BITPIX" , 8 , "8-bit bytes"));
773 header.push_back(HeaderEntry("NAXIS" , 2 , "2-dimensional binary table"));
774 header.push_back(HeaderEntry("NAXIS1" , 1024*1440*2 , "width of table in bytes"));
775 header.push_back(HeaderEntry("NAXIS2" , 1 , "number of rows in table"));
776 header.push_back(HeaderEntry("PCOUNT" , 0 , "size of special data area"));
777 header.push_back(HeaderEntry("GCOUNT" , 1 , "one data group (required keyword)"));
778 header.push_back(HeaderEntry("TFIELDS" , 1 , "number of fields in each row"));
779 header.push_back(HeaderEntry("CHECKSUM", "'0000000000000000' ", "Checksum for the whole HDU"));
780 header.push_back(HeaderEntry("DATASUM" , " 0" , "Checksum for the data block"));
781 header.push_back(HeaderEntry("EXTNAME" , "'ZDrsCellOffsets' ", "name of this binary table extension"));
782 header.push_back(HeaderEntry("TTYPE1" , "'OffsetCalibration' ", "label for field 1"));
783 header.push_back(HeaderEntry("TFORM1" , "'1474560I' ", "data format of field: 2-byte INTEGER"));
784
785 for (uint32_t i=0;i<header.size();i++)
786 _file.write(header[i].fitsString().c_str(), 80);
787 //End the header
788 _file.write("END ", 80);
789 long here = _file.tellp();
790 if (here%2880)
791 _file.write(_emptyBlock.c_str(), 2880 - here%2880);
792 //now write the data itself
793 int16_t* swappedBytes = new int16_t[1024];
794 Checksum checksum;
795 for (int32_t i=0;i<1440;i++)
796 {
797 memcpy(swappedBytes, &(_drsCalibData[i*1024]), 2048);
798 for (int32_t j=0;j<2048;j+=2)
799 {
800 int8_t inter;
801 inter = reinterpret_cast<int8_t*>(swappedBytes)[j];
802 reinterpret_cast<int8_t*>(swappedBytes)[j] = reinterpret_cast<int8_t*>(swappedBytes)[j+1];
803 reinterpret_cast<int8_t*>(swappedBytes)[j+1] = inter;
804 }
805 _file.write(reinterpret_cast<char*>(swappedBytes), 2048);
806 checksum.add(reinterpret_cast<char*>(swappedBytes), 2048);
807 }
808 uint64_t whereDidIStop = _file.tellp();
809 delete[] swappedBytes;
810 //No need to pad the data, as (1440*1024*2)%2880==0
811
812 //calculate the checksum from the header
813 ostringstream str;
814 str << checksum.val();
815 header[9] = HeaderEntry("DATASUM", str.str(), "Checksum for the data block");
816 for (vector<HeaderEntry>::iterator it=header.begin();it!=header.end(); it++)
817 checksum.add(it->fitsString().c_str(), 80);
818 string end("END ");
819 string space(" ");
820 checksum.add(end.c_str(), 80);
821 int headerRowsLeft = 36 - (header.size() + 1)%36;
822 for (int i=0;i<headerRowsLeft;i++)
823 checksum.add(space.c_str(), 80);
824 //udpate the checksum keyword
825 header[8] = HeaderEntry("CHECKSUM", checksum.str(), "Checksum for the whole HDU");
826 //and eventually re-write the header data
827 _file.seekp(whereDidIStart);
828 for (uint32_t i=0;i<header.size();i++)
829 _file.write(header[i].fitsString().c_str(), 80);
830 _file.seekp(whereDidIStop);
831}
832
833/****************************************************************
834 * ADD COLUMN
835 ****************************************************************/
836bool CompressedFitsWriter::addColumn(const ColumnEntry& column)
837{
838 if (_totalNumRows != 0)
839 {
840 cout << "Error: cannot add new columns once first row has been written" << endl;
841 return false;
842 }
843 for (vector<ColumnEntry>::iterator it=_columns.begin(); it != _columns.end(); it++)
844 {
845 if (it->name() == column.name())
846 {
847 cout << "Warning: column already exist (" << column.name() << "). Ignoring" << endl;
848 return false;
849 }
850 }
851 _columns.push_back(column);
852 _columns.back().setOffset(_rowWidth);
853 _rowWidth += column.width();
854 reallocateBuffers();
855
856 ostringstream str, str2, str3;
857 str << "TTYPE" << _columns.size();
858 str2 << column.name();
859 str3 << "label for field ";
860 if (_columns.size() < 10) str3 << " ";
861 if (_columns.size() < 100) str3 << " ";
862 str3 << _columns.size();
863 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
864 str.str("");
865 str2.str("");
866 str3.str("");
867 str << "TFORM" << _columns.size();
868 str2 << "1QB";
869 str3 << "data format of field " << _columns.size();
870 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
871 str.str("");
872 str2.str("");
873 str3.str("");
874 str << "ZFORM" << _columns.size();
875 str2 << column.numElems() << column.type();
876 str3 << "Original format of field " << _columns.size();
877 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
878 str.str("");
879 str2.str("");
880 str3.str("");
881 str << "ZCTYP" << _columns.size();
882 str2 << column.getCompressionString();
883 str3 << "Comp. Scheme of field " << _columns.size();
884 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
885 //resize the catalog vector accordingly
886 for (uint32_t i=0;i<_numTiles;i++)
887 {
888 _catalog[i].resize(_columns.size());
889 for (uint32_t j=0;j<_catalog[i].size();j++)
890 _catalog[i][j] = make_pair(0,0);
891 }
892 return true;
893}
894
895/****************************************************************
896 * SET HEADER KEY
897 ****************************************************************/
898bool CompressedFitsWriter::setHeaderKey(const HeaderEntry& entry)
899{
900 HeaderEntry ent = entry;
901 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
902 {
903 if (it->key() == entry.key())
904 {
905 if (entry.comment() == "")
906 ent.setComment(it->comment());
907 (*it) = ent;
908 _headerFlushed = false;
909 return true;
910 }
911 }
912 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
913 {
914 if (it->key() == entry.key())
915 {
916 if (entry.comment() == "")
917 ent.setComment(it->comment());
918 (*it) = ent;
919 _headerFlushed = false;
920 return true;
921 }
922 }
923 if (_totalNumRows != 0)
924 {
925 cout << "Error: new header keys (" << entry.key() << ") must be set before the first row is written. Ignoring." << endl;
926 return false;
927 }
928 _header.push_back(entry);
929 _headerFlushed = false;
930 return true;
931}
932
933/****************************************************************
934 * OPEN
935 ****************************************************************/
936bool CompressedFitsWriter::open(const string& fileName, const string& tableName)
937{
938 _file.open(fileName.c_str(), ios_base::out);
939 if (!_file.is_open())
940 {
941 cout << "Error: Could not open the file (" << fileName << ")." << endl;
942 return false;
943 }
944 _defaultHeader.push_back(HeaderEntry("EXTNAME", tableName, "name of this binary table extension"));
945 _headerFlushed = false;
946 _threadIndex = 0;
947 //create requested number of threads
948 for (uint32_t i=0;i<_numThreads;i++)
949 pthread_create(&(_thread[i]), NULL, threadFunction, this);
950 //wait for the threads to start
951 while (_numThreads != _threadIndex)
952 usleep(1000);
953 //set the writing fence to the last thread
954 _threadIndex = _numThreads-1;
955 return (_file.good());
956}
957
958/****************************************************************
959 * WRITE HEADER
960 ****************************************************************/
961void CompressedFitsWriter::writeHeader(bool closingFile)
962{
963 if (_headerFlushed)
964 return;
965 if (!_file.is_open())
966 return;
967
968 long cPos = _file.tellp();
969
970 _file.seekp(0);
971
972 _file.write(_fitsHeader.c_str(), 2880);
973
974 //Write the DRS calib table here !
975 writeDrsCalib();
976
977 //we are now at the beginning of the main table. Write its header
978 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
979 _file.write(it->fitsString().c_str(), 80);
980
981 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
982 _file.write(it->fitsString().c_str(), 80);
983
984 _file.write("END ", 80);
985 long here = _file.tellp();
986 if (here%2880)
987 _file.write(_emptyBlock.c_str(), 2880 - here%2880);
988
989 _headerFlushed = true;
990
991 here = _file.tellp();
992
993 if (here%2880)
994 cout << "Error: seems that header did not finish at the end of a block." << endl;
995
996 if (here > cPos && cPos != 0)
997 {
998 cout << "Error, entries were added after the first row was written. This is not supposed to happen." << endl;
999 return;
1000 }
1001
1002 here = _file.tellp();
1003 writeCatalog(closingFile);
1004
1005 here = _file.tellp() - here;
1006 _heapPtr = here;
1007
1008 if (cPos != 0)
1009 _file.seekp(cPos);
1010}
1011
1012/****************************************************************
1013 * WRITE CATALOG
1014 * WARNING: writeCatalog is only meant to be used by writeHeader.
1015 * external usage will most likely corrupt the file
1016 ****************************************************************/
1017void CompressedFitsWriter::writeCatalog(bool closingFile)
1018{
1019 uint32_t sizeWritten = 0;
1020 for (uint32_t i=0;i<_catalog.size();i++)
1021 {
1022 for (uint32_t j=0;j<_catalog[i].size();j++)
1023 {
1024 //swap the bytes
1025 int8_t swappedEntry[16];
1026 swappedEntry[0] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[7];
1027 swappedEntry[1] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[6];
1028 swappedEntry[2] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[5];
1029 swappedEntry[3] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[4];
1030 swappedEntry[4] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[3];
1031 swappedEntry[5] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[2];
1032 swappedEntry[6] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[1];
1033 swappedEntry[7] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[0];
1034
1035 swappedEntry[8] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[7];
1036 swappedEntry[9] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[6];
1037 swappedEntry[10] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[5];
1038 swappedEntry[11] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[4];
1039 swappedEntry[12] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[3];
1040 swappedEntry[13] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[2];
1041 swappedEntry[14] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[1];
1042 swappedEntry[15] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[0];
1043 if (closingFile)
1044 {
1045 _checksum.add(reinterpret_cast<char*>(swappedEntry), 16);
1046 }
1047 _file.write(reinterpret_cast<char*>(&swappedEntry[0]), 2*sizeof(int64_t));
1048 sizeWritten += 2*sizeof(int64_t);
1049 }
1050 }
1051
1052 //we do not reserve space for now because fverify does not like that.
1053 //TODO bug should be fixed in the new version. Install it on the cluster and restor space reservation
1054 return ;
1055
1056 //write the padding so that the HEAP section starts at a 2880 bytes boundary
1057 if (sizeWritten % 2880 != 0)
1058 {
1059 vector<char> nullVec(2880 - sizeWritten%2880, 0);
1060 _file.write(nullVec.data(), 2880 - sizeWritten%2880);
1061 }
1062}
1063
1064/****************************************************************
1065 * ADD HEADER CHECKSUM
1066 ****************************************************************/
1067void CompressedFitsWriter::addHeaderChecksum(Checksum& checksum)
1068{
1069 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin();it!=_defaultHeader.end(); it++)
1070 _checksum.add(it->fitsString().c_str(), 80);
1071 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
1072 _checksum.add(it->fitsString().c_str(), 80);
1073 string end("END ");
1074 string space(" ");
1075 checksum.add(end.c_str(), 80);
1076 int headerRowsLeft = 36 - (_defaultHeader.size() + _header.size() + 1)%36;
1077 for (int i=0;i<headerRowsLeft;i++)
1078 checksum.add(space.c_str(), 80);
1079}
1080
1081/****************************************************************
1082 * CLOSE
1083 ****************************************************************/
1084bool CompressedFitsWriter::close()
1085{
1086 for (uint32_t i=0;i<_numThreads;i++)
1087 while (_threadStatus[i] != _THREAD_WAIT_)
1088 usleep(100000);
1089 for (uint32_t i=0;i<_numThreads;i++)
1090 _threadStatus[i] = _THREAD_EXIT_;
1091 for (uint32_t i=0;i<_numThreads;i++)
1092 pthread_join(_thread[i], NULL);
1093 //flush the rows that were not written yet
1094 if (_totalNumRows%_numRowsPerTile != 0)
1095 {
1096 copyTransposeTile(0);
1097
1098 _threadNumRows[0] = _totalNumRows;
1099 uint32_t numBytes = compressBuffer(0);
1100 writeCompressedDataToDisk(0, numBytes);
1101 }
1102 //compression stuff
1103 setHeaderKey(HeaderEntry("ZNAXIS1", _rowWidth, "Width of uncompressed rows"));
1104 setHeaderKey(HeaderEntry("ZNAXIS2", _totalNumRows, "Number of uncompressed rows"));
1105 //TODO calculate the real offset from the main table to the start of the HEAP data area
1106 setHeaderKey(HeaderEntry("ZHEAPPTR", _heapPtr, ""));
1107 setHeaderKey(HeaderEntry("THEAP", _heapPtr, ""));
1108
1109 //regular stuff
1110 if (_catalog.size() > 0)
1111 {
1112 setHeaderKey(HeaderEntry("NAXIS1", 2*sizeof(int64_t)*_catalog[0].size(), "width of table in bytes"));
1113 setHeaderKey(HeaderEntry("NAXIS2", _numTiles, ""));
1114 setHeaderKey(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
1115 int64_t heapSize = 0;
1116 int64_t compressedOffset = 0;
1117 for (uint32_t i=0;i<_catalog.size();i++)
1118 {
1119 compressedOffset += sizeof(TileHeader);
1120 heapSize += sizeof(TileHeader);
1121 for (uint32_t j=0;j<_catalog[i].size();j++)
1122 {
1123 heapSize += _catalog[i][j].first;
1124// cout << "heapSize: " << heapSize << endl;
1125 //set the catalog offsets to their actual values
1126 _catalog[i][j].second = compressedOffset;
1127 compressedOffset += _catalog[i][j].first;
1128 //special case if entry has zero length
1129 if (_catalog[i][j].first == 0) _catalog[i][j].second = 0;
1130 }
1131 }
1132 setHeaderKey(HeaderEntry("PCOUNT", heapSize, "size of special data area"));
1133 }
1134 else
1135 {
1136 setHeaderKey(HeaderEntry("NAXIS1", _rowWidth, "width of table in bytes"));
1137 setHeaderKey(HeaderEntry("NAXIS2", 0, ""));
1138 setHeaderKey(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
1139 setHeaderKey(HeaderEntry("PCOUNT", 0, "size of special data area"));
1140 }
1141 ostringstream str;
1142
1143 writeHeader(true);
1144
1145 str.str("");
1146 str << _checksum.val();
1147
1148 setHeaderKey(HeaderEntry("DATASUM", str.str(), ""));
1149 addHeaderChecksum(_checksum);
1150 setHeaderKey(HeaderEntry("CHECKSUM", _checksum.str(), ""));
1151 //update header value
1152 writeHeader();
1153 //update file length
1154 long here = _file.tellp();
1155 if (here%2880)
1156 {
1157 vector<char> nullVec(2880 - here%2880, 0);
1158 _file.write(nullVec.data(), 2880 - here%2880);
1159 }
1160 _file.close();
1161 return true;
1162}
1163
1164/****************************************************************
1165 * COPY TRANSPOSE TILE
1166 ****************************************************************/
1167void CompressedFitsWriter::copyTransposeTile(uint32_t index)
1168{
1169 uint32_t thisRoundNumRows = (_totalNumRows%_numRowsPerTile) ? _totalNumRows%_numRowsPerTile : _numRowsPerTile;
1170
1171 //copy the tile and transpose it
1172 uint32_t offset = 0;
1173 for (uint32_t i=0;i<_columns.size();i++)
1174 {
1175 switch (_columns[i].getColumnOrdering())//getCompression())
1176 {
1177 case zfits::kOrderByRow:
1178 for (uint32_t k=0;k<thisRoundNumRows;k++)
1179 {//regular, "semi-transposed" copy
1180 memcpy(&(_transposedBuffer[index][offset]), &_buffer[k*_rowWidth + _columns[i].offset()], _columns[i].sizeOfElems()*_columns[i].numElems());
1181 offset += _columns[i].sizeOfElems()*_columns[i].numElems();
1182 }
1183 break;
1184
1185 case zfits::kOrderByCol :
1186 for (int j=0;j<_columns[i].numElems();j++)
1187 for (uint32_t k=0;k<thisRoundNumRows;k++)
1188 {//transposed copy
1189 memcpy(&(_transposedBuffer[index][offset]), &_buffer[k*_rowWidth + _columns[i].offset() + _columns[i].sizeOfElems()*j], _columns[i].sizeOfElems());
1190 offset += _columns[i].sizeOfElems();
1191 }
1192 break;
1193 default:
1194 cout << "Error: unknown column ordering: " << _columns[i].getColumnOrdering() << endl;
1195
1196 };
1197 }
1198}
1199
1200/****************************************************************
1201 * WRITE BINARY ROW
1202 ****************************************************************/
1203bool CompressedFitsWriter::writeBinaryRow(const char* bufferToWrite)
1204{
1205 if (_totalNumRows == 0)
1206 writeHeader();
1207
1208 memcpy(&_buffer[_rowWidth*(_totalNumRows%_numRowsPerTile)], bufferToWrite, _rowWidth);
1209 _totalNumRows++;
1210 if (_totalNumRows%_numRowsPerTile == 0)
1211 {
1212 //which is the next thread that we should use ?
1213 while (_threadStatus[_threadLooper] == _THREAD_COMPRESS_)
1214 usleep(100000);
1215
1216 copyTransposeTile(_threadLooper);
1217
1218 while (_threadStatus[_threadLooper] != _THREAD_WAIT_)
1219 usleep(100000);
1220
1221 _threadNumRows[_threadLooper] = _totalNumRows;
1222 _threadStatus[_threadLooper] = _THREAD_COMPRESS_;
1223 _threadLooper = (_threadLooper+1)%_numThreads;
1224 }
1225 return _file.good();
1226}
1227
1228uint32_t CompressedFitsWriter::getRowWidth()
1229{
1230 return _rowWidth;
1231}
1232
1233/****************************************************************
1234 * COMPRESS BUFFER
1235 ****************************************************************/
1236uint32_t CompressedFitsWriter::compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1237{
1238 memcpy(dest, src, numRows*sizeOfElems*numRowElems);
1239 return numRows*sizeOfElems*numRowElems;
1240}
1241
1242/****************************************************************
1243 * COMPRESS BUFFER
1244 ****************************************************************/
1245uint32_t CompressedFitsWriter::compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1246{
1247 string huffmanOutput;
1248 uint32_t previousHuffmanSize = 0;
1249 if (numRows < 2)
1250 {//if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
1251 return numRows*sizeOfElems*numRowElems + 1000;
1252 }
1253 if (sizeOfElems < 2 )
1254 {
1255 cout << "Fatal ERROR: HUFMANN can only encode short or longer types" << endl;
1256 return 0;
1257 }
1258 uint32_t huffmanOffset = 0;
1259 for (uint32_t j=0;j<numRowElems;j++)
1260 {
1261 Huffman::Encode(huffmanOutput,
1262 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
1263 numRows*(sizeOfElems/2));
1264 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
1265 huffmanOffset += sizeof(uint32_t);
1266 previousHuffmanSize = huffmanOutput.size();
1267 }
1268 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
1269
1270 //only copy if not larger than not-compressed size
1271 if (totalSize < numRows*sizeOfElems*numRowElems)
1272 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
1273
1274 return totalSize;
1275}
1276
1277/****************************************************************
1278 * COMPRESS BUFFER
1279 ****************************************************************/
1280uint32_t CompressedFitsWriter::compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1281{
1282 uint32_t colWidth = numRowElems;
1283 for (int j=colWidth*numRows-1;j>1;j--)
1284 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
1285 //call the huffman transposed
1286 return compressHUFFMAN(dest, src, numRowElems, sizeOfElems, numRows);
1287}
1288
1289uint32_t CompressedFitsWriter::applySMOOTHING(char* , char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1290{
1291 uint32_t colWidth = numRowElems;
1292 for (int j=colWidth*numRows-1;j>1;j--)
1293 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
1294
1295 return numRows*sizeOfElems*numRowElems;
1296}
1297/****************************************************************
1298 * COMPRESS BUFFER
1299 ****************************************************************/
1300uint64_t CompressedFitsWriter::compressBuffer(uint32_t threadIndex)
1301{
1302 uint32_t thisRoundNumRows = (_threadNumRows[threadIndex]%_numRowsPerTile) ? _threadNumRows[threadIndex]%_numRowsPerTile : _numRowsPerTile;
1303 uint32_t offset=0;
1304 uint32_t currentCatalogRow = (_threadNumRows[threadIndex]-1)/_numRowsPerTile;
1305 uint64_t compressedOffset = sizeof(TileHeader); //skip the 'TILE' marker and tile size entry
1306
1307 //now compress each column one by one by calling compression on arrays
1308 for (uint32_t i=0;i<_columns.size();i++)
1309 {
1310 _catalog[currentCatalogRow][i].second = compressedOffset;
1311
1312 if (_columns[i].numElems() == 0) continue;
1313
1314 BlockHeader& head = _columns[i].getBlockHeader();
1315 const vector<uint16_t>& sequence = _columns[i].getCompressionSequence();
1316 //set the default byte telling if uncompressed the compressed Flag
1317 uint64_t previousOffset = compressedOffset;
1318 //skip header data
1319 compressedOffset += sizeof(BlockHeader) + sizeof(uint16_t)*sequence.size();
1320
1321 for (uint32_t j=0;j<sequence.size(); j++)
1322 {
1323 switch (sequence[j])
1324 {
1325 case zfits::kFactRaw:
1326// if (head.numProcs == 1)
1327 compressedOffset += compressUNCOMPRESSED(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1328 break;
1329 case zfits::kFactSmoothing:
1330 applySMOOTHING(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1331 break;
1332 case zfits::kFactHuffman16:
1333 if (head.ordering == zfits::kOrderByCol)
1334 compressedOffset += compressHUFFMAN(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1335 else
1336 compressedOffset += compressHUFFMAN(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), _columns[i].numElems(), _columns[i].sizeOfElems(), thisRoundNumRows);
1337 break;
1338 default:
1339 cout << "ERROR: Unkown compression sequence entry: " << sequence[i] << endl;
1340 break;
1341 }
1342 }
1343
1344 //check if compressed size is larger than uncompressed
1345 if (sequence[0] != zfits::kFactRaw &&
1346 compressedOffset - previousOffset > _columns[i].sizeOfElems()*_columns[i].numElems()*thisRoundNumRows+sizeof(BlockHeader)+sizeof(uint16_t)*sequence.size())
1347 {//if so set flag and redo it uncompressed
1348 cout << "REDOING UNCOMPRESSED" << endl;
1349 compressedOffset = previousOffset + sizeof(BlockHeader) + 1;
1350 compressedOffset += compressUNCOMPRESSED(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1351 BlockHeader he;
1352 he.size = compressedOffset - previousOffset;
1353 he.numProcs = 1;
1354 he.ordering = zfits::kOrderByRow;
1355 memcpy(&(_compressedBuffer[threadIndex][previousOffset]), (char*)(&he), sizeof(BlockHeader));
1356 _compressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)] = zfits::kFactRaw;
1357 offset += thisRoundNumRows*_columns[i].sizeOfElems()*_columns[i].numElems();
1358 _catalog[currentCatalogRow][i].first = compressedOffset - _catalog[currentCatalogRow][i].second;
1359 continue;
1360 }
1361 head.size = compressedOffset - previousOffset;
1362 memcpy(&(_compressedBuffer[threadIndex][previousOffset]), (char*)(&head), sizeof(BlockHeader));
1363 memcpy(&(_compressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)]), sequence.data(), sizeof(uint16_t)*sequence.size());
1364
1365 offset += thisRoundNumRows*_columns[i].sizeOfElems()*_columns[i].numElems();
1366 _catalog[currentCatalogRow][i].first = compressedOffset - _catalog[currentCatalogRow][i].second;
1367 }
1368
1369 TileHeader tHead(thisRoundNumRows, compressedOffset);
1370 memcpy(_compressedBuffer[threadIndex], &tHead, sizeof(TileHeader));
1371 return compressedOffset;
1372}
1373
1374/****************************************************************
1375 * WRITE COMPRESS DATA TO DISK
1376 ****************************************************************/
1377bool CompressedFitsWriter::writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite)
1378{
1379 char* checkSumPointer = _compressedBuffer[threadID];
1380 int32_t extraBytes = 0;
1381 uint32_t sizeToChecksum = sizeToWrite;
1382 if (_checkOffset != 0)
1383 {//should we extend the array to the left ?
1384 sizeToChecksum += _checkOffset;
1385 checkSumPointer -= _checkOffset;
1386 memset(checkSumPointer, 0, _checkOffset);
1387 }
1388 if (sizeToChecksum%4 != 0)
1389 {//should we extend the array to the right ?
1390 extraBytes = 4 - (sizeToChecksum%4);
1391 memset(checkSumPointer+sizeToChecksum, 0,extraBytes);
1392 sizeToChecksum += extraBytes;
1393 }
1394 //do the checksum
1395 _checksum.add(checkSumPointer, sizeToChecksum);
1396// cout << endl << "Checksum: " << _checksum.val() << endl;
1397 _checkOffset = (4 - extraBytes)%4;
1398 //write data to disk
1399 _file.write(_compressedBuffer[threadID], sizeToWrite);
1400 return _file.good();
1401}
1402
1403/****************************************************************
1404 * WRITER THREAD LOOP
1405 ****************************************************************/
1406void* CompressedFitsWriter::threadFunction(void* context)
1407{
1408 CompressedFitsWriter* myself =static_cast<CompressedFitsWriter*>(context);
1409
1410 uint32_t myID = 0;
1411 pthread_mutex_lock(&(myself->_mutex));
1412 myID = myself->_threadIndex++;
1413 pthread_mutex_unlock(&(myself->_mutex));
1414 uint32_t threadToWaitForBeforeWriting = (myID == 0) ? myself->_numThreads-1 : myID-1;
1415
1416 while (myself->_threadStatus[myID] != _THREAD_EXIT_)
1417 {
1418 while (myself->_threadStatus[myID] == _THREAD_WAIT_)
1419 usleep(100000);
1420 if (myself->_threadStatus[myID] != _THREAD_COMPRESS_)
1421 continue;
1422 uint32_t numBytes = myself->compressBuffer(myID);
1423 myself->_threadStatus[myID] = _THREAD_WRITE_;
1424
1425 //wait for the previous data to be written
1426 while (myself->_threadIndex != threadToWaitForBeforeWriting)
1427 usleep(1000);
1428 //do the actual writing to disk
1429 pthread_mutex_lock(&(myself->_mutex));
1430 myself->writeCompressedDataToDisk(myID, numBytes);
1431 myself->_threadIndex = myID;
1432 pthread_mutex_unlock(&(myself->_mutex));
1433 myself->_threadStatus[myID] = _THREAD_WAIT_;
1434 }
1435 return NULL;
1436}
1437
1438/****************************************************************
1439 * PRINT USAGE
1440 ****************************************************************/
1441void printUsage()
1442{
1443 cout << endl;
1444 cout << "The FACT-Fits compressor reads an input Fits file from FACT"
1445 " and compresses it.\n It can use a drs calibration in order to"
1446 " improve the compression ratio. If so, the input calibration"
1447 " is embedded into the compressed file.\n"
1448 " By default, the Data column will be compressed using SMOOTHMAN (Thomas' algorithm)"
1449 " while other columns will be compressed with the AMPLITUDE coding (Veritas)"
1450 "Usage: Compressed_Fits_Test <inputFile>";
1451 cout << endl;
1452}
1453
1454/****************************************************************
1455 * PRINT HELP
1456 ****************************************************************/
1457void printHelp()
1458{
1459 cout << endl;
1460 cout << "The inputFile is required. It must have fits in its filename and the compressed file will be written in the same folder. "
1461 "The fz extension will be added, replacing the .gz one if required \n"
1462 "If output is specified, then it will replace the automatically generated output filename\n"
1463 "If --drs, followed by a drs calib then it will be applied to the data before compressing\n"
1464 "rowPerTile can be used to tune how many rows are in each tile. Default is 100\n"
1465 "threads gives the number of threads to use. Cannot be less than the default (1)\n"
1466 "compression explicitely gives the compression scheme to use for a given column. The syntax is:\n"
1467 "<ColumnName>=<CompressionScheme> with <CompressionScheme> one of the following:\n"
1468 "UNCOMPRESSED\n"
1469 "AMPLITUDE\n"
1470 "HUFFMAN\n"
1471 "SMOOTHMAN\n"
1472 "INT_WAVELET\n"
1473 "\n"
1474 "--quiet removes any textual output, except error messages\n"
1475 "--verify makes the compressor check the compressed data. It will read it back, and compare the reconstructed CHECKSUM and DATASUM with the original file values."
1476 ;
1477 cout << endl << endl;
1478}
1479
1480/****************************************************************
1481 * SETUP CONFIGURATION
1482 ****************************************************************/
1483void setupConfiguration(Configuration& conf)
1484{
1485 po::options_description configs("FitsCompressor options");
1486 configs.add_options()
1487 ("inputFile,i", vars<string>(), "Input file")
1488 ("drs,d", var<string>(), "Input drs calibration file")
1489 ("rowPerTile,r", var<unsigned int>(), "Number of rows per tile. Default is 100")
1490 ("output,o", var<string>(), "Output file. If empty, .fz is appened to the original name")
1491 ("threads,t", var<unsigned int>(), "Number of threads to use for compression")
1492 ("compression,c", vars<string>(), "which compression to use for which column. Syntax <colName>=<compressionScheme>")
1493 ("quiet,q", po_switch(), "Should the program display any text at all ?")
1494 ("verify,v", po_switch(), "Should we verify the data that has been compressed ?")
1495 ;
1496 po::positional_options_description positional;
1497 positional.add("inputFile", -1);
1498 conf.AddOptions(configs);
1499 conf.SetArgumentPositions(positional);
1500}
1501
1502/****************************************************************
1503 * MAIN
1504 ****************************************************************/
1505int main(int argc, const char** argv)
1506{
1507 Configuration conf(argv[0]);
1508 conf.SetPrintUsage(printUsage);
1509 setupConfiguration(conf);
1510
1511 if (!conf.DoParse(argc, argv, printHelp))
1512 return -1;
1513
1514 //initialize the file names to nothing.
1515 string fileNameIn = "";
1516 string fileNameOut = "";
1517 string drsFileName = "";
1518 uint32_t numRowsPerTile = 100;
1519 bool displayText=true;
1520
1521 //parse configuration
1522 if (conf.Get<bool>("quiet")) displayText = false;
1523 const vector<string> inputFileNameVec = conf.Vec<string>("inputFile");
1524 if (inputFileNameVec.size() != 1)
1525 {
1526 cout << "Error: ";
1527 if (inputFileNameVec.size() == 0) cout << "no";
1528 else cout << inputFileNameVec.size();
1529 cout << " input file(s) given. Expected one. Aborting. Input:" << endl;;
1530 for (unsigned int i=0;i<inputFileNameVec.size(); i++)
1531 cout << inputFileNameVec[i] << endl;
1532 return -1;
1533 }
1534
1535 //Assign the input filename
1536 fileNameIn = inputFileNameVec[0];
1537
1538 //Check if we have a drs calib too
1539 if (conf.Has("drs")) drsFileName = conf.Get<string>("drs");
1540
1541 //Should we verify the data ?
1542 bool verifyDataPlease = false;
1543 if (conf.Has("verify")) verifyDataPlease = conf.Get<bool>("verify");
1544
1545
1546 //should we use a specific output filename ?
1547 if (conf.Has("output"))
1548 fileNameOut = conf.Get<string>("output");
1549 else
1550 {
1551 size_t pos = fileNameIn.find(".fits");
1552 if (pos == string::npos)
1553 {
1554 cout << "ERROR: input file does not seems ot be fits. Aborting." << endl;
1555 return -1;
1556 }
1557 fileNameOut = fileNameIn.substr(0, pos) + ".fz";
1558 }
1559
1560 //should we use specific compression on some columns ?
1561 const vector<string> columnsCompression = conf.Vec<string>("compression");
1562
1563 //split up values between column names and compression scheme
1564 vector<std::pair<string, string>> compressions;
1565 for (unsigned int i=0;i<columnsCompression.size();i++)
1566 {
1567 size_t pos = columnsCompression[i].find_first_of("=");
1568 if (pos == string::npos)
1569 {
1570 cout << "ERROR: Something wrong occured while parsing " << columnsCompression[i] << ". Aborting." << endl;
1571 return -1;
1572 }
1573 string comp = columnsCompression[i].substr(pos+1);
1574 if (comp != "UNCOMPRESSED" && comp != "AMPLITUDE" && comp != "HUFFMAN" &&
1575 comp != "SMOOTHMAN" && comp != "INT_WAVELET")
1576 {
1577 cout << "Unkown compression scheme requested (" << comp << "). Aborting." << endl;
1578 return -1;
1579 }
1580 compressions.push_back(make_pair(columnsCompression[i].substr(0, pos), comp));
1581 }
1582
1583 //How many rows per tile should we use ?
1584 if (conf.Has("rowPerTile")) numRowsPerTile = conf.Get<unsigned int>("rowPerTile");
1585
1586 /************************************************************************************
1587 * Done reading configuration. Open relevant files
1588 ************************************************************************************/
1589
1590 //Open input's fits file
1591 factfits inFile(fileNameIn);
1592
1593 if (inFile.IsCompressedFITS())
1594 {
1595 cout << "ERROR: input file is already a compressed fits. Cannot be compressed again: Aborting." << endl;
1596 return -1;
1597 }
1598
1599 //decide how many tiles should be put in the compressed file
1600 uint32_t originalNumRows = inFile.GetNumRows();
1601 uint32_t numTiles = (originalNumRows%numRowsPerTile) ? (originalNumRows/numRowsPerTile)+1 : originalNumRows/numRowsPerTile;
1602 CompressedFitsWriter outFile(numTiles, numRowsPerTile);
1603
1604 //should we use a specific number of threads for compressing ?
1605 unsigned int numThreads = 1;
1606 if (conf.Has("threads"))
1607 {
1608 numThreads = conf.Get<unsigned int>("threads");
1609 outFile.setNumWorkingThreads(numThreads);
1610 }
1611
1612 if (!outFile.open(fileNameOut))
1613 {
1614 cout << "Error: could not open " << fileNameOut << " for writing" << endl;
1615 return -1;
1616 }
1617
1618 //Because the file to open MUST be given by the constructor, I must use a pointer instead
1619 factfits* drsFile = NULL;
1620 //try to open the Drs file. If any.
1621 if (drsFileName != "")
1622 {
1623 try
1624 {
1625 drsFile = new factfits(drsFileName);
1626 }
1627 catch (...)
1628 {
1629 cout << "Error: could not open " << drsFileName << " for calibration" << endl;
1630 return -1;
1631 }
1632 }
1633
1634 if (displayText)
1635 {
1636 cout << endl;
1637 cout << "**********************" << endl;
1638 cout << "Will compress from : " << fileNameIn << endl;
1639 cout << "to : " << fileNameOut << endl;
1640 if (drsFileName != "")
1641 cout << "while calibrating with: " << drsFileName << endl;
1642 cout << "Compression will use : " << numThreads << " worker threads" << endl;
1643 cout << "Data will be verified : ";
1644 if (verifyDataPlease)
1645 cout << "yes" << endl;
1646 else
1647 cout << "no (WARNING !)" << endl;
1648 cout << "**********************" << endl;
1649 cout << endl;
1650 }
1651
1652 /************************************************************************************
1653 * Done opening input files. Allocate memory and configure output file
1654 ************************************************************************************/
1655
1656 //allocate the buffer for temporary storage of each read/written row
1657 uint32_t rowWidth = inFile.GetUInt("NAXIS1");
1658 char* buffer = new char[rowWidth + 12];
1659 memset(buffer, 0, 4);
1660 buffer = buffer+4;
1661
1662 //get the source columns
1663 const fits::Table::Columns& columns = inFile.GetColumns();
1664 const fits::Table::SortedColumns& sortedColumns = inFile.GetSortedColumns();
1665 if (displayText)
1666 cout << "Input file has " << columns.size() << " columns and " << inFile.GetNumRows() << " rows" << endl;
1667
1668 //Add columns.
1669 uint32_t totalRowWidth = 0;
1670 for (uint32_t i=0;i<sortedColumns.size(); i++)
1671 {
1672 //get column name
1673 ostringstream str;
1674 str << "TTYPE" << i+1;
1675 string colName = inFile.GetStr(str.str());
1676 if (displayText)
1677 {
1678 cout << "Column " << colName;
1679 for (uint32_t j=colName.size();j<21;j++)
1680 cout << " ";
1681 cout << " -> ";
1682 }
1683
1684 //get header structures
1685 BlockHeader rawHeader;
1686 BlockHeader smoothmanHeader(0, zfits::kOrderByRow, 2);
1687 vector<uint16_t> rawProcessings(1);
1688 rawProcessings[0] = zfits::kFactRaw;
1689 vector<uint16_t> smoothmanProcessings(2);
1690 smoothmanProcessings[0] = zfits::kFactSmoothing;
1691 smoothmanProcessings[1] = zfits::kFactHuffman16;
1692// smoothmanProcessings[2] = FACT_RAW;
1693
1694 totalRowWidth += sortedColumns[i].bytes;
1695
1696 //first lets see if we do have an explicit request
1697 bool explicitRequest = false;
1698 for (unsigned int j=0;j<compressions.size();j++)
1699 {
1700 if (compressions[j].first == colName)
1701 {
1702 explicitRequest = true;
1703 if (displayText) cout << compressions[j].second << endl;
1704 if (compressions[j].second == "UNCOMPRESSED")
1705 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, rawHeader, rawProcessings));
1706 if (compressions[j].second == "SMOOTHMAN")
1707 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, smoothmanHeader, smoothmanProcessings));
1708 break;
1709 }
1710 }
1711
1712 if (explicitRequest) continue;
1713
1714 if (colName != "Data")
1715 {
1716 if (displayText) cout << "UNCOMPRESSED" << endl;
1717 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, rawHeader, rawProcessings));
1718 }
1719 else
1720 {
1721 if (displayText) cout << "SMOOTHMAN" << endl;
1722 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, smoothmanHeader, smoothmanProcessings));
1723 }
1724 }
1725
1726 //translate original header entries to their Z-version
1727 const fits::Table::Keys& header = inFile.GetKeys();
1728 for (fits::Table::Keys::const_iterator i=header.begin(); i!= header.end(); i++)
1729 {
1730 string k = i->first;//header[i].key();
1731 if (k == "XTENSION" || k == "BITPIX" || k == "PCOUNT" || k == "GCOUNT" || k == "TFIELDS")
1732 continue;
1733 if (k == "CHECKSUM")
1734 {
1735 outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("ZCHKSUM", i->second.value, i->second.comment));
1736 continue;
1737 }
1738 if (k == "DATASUM")
1739 {
1740 outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("ZDTASUM", i->second.value, i->second.comment));
1741 continue;
1742 }
1743 k = k.substr(0,5);
1744 if (k == "TTYPE" || k == "TFORM")
1745 {
1746 string tmpKey = i->second.fitsString;
1747 tmpKey[0] = 'Z';
1748 outFile.setHeaderKey(tmpKey);
1749 continue;
1750 }
1751 if (k == "NAXIS")
1752 continue;
1753 outFile.setHeaderKey(i->second.fitsString);
1754// cout << i->first << endl;
1755 }
1756
1757 outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("RAWSUM", " 0", "Checksum of raw littlen endian data"));
1758
1759 //deal with the DRS calib
1760 int16_t* drsCalib16 = NULL;
1761
1762 //load the drs calib. data
1763 int32_t startCellOffset = -1;
1764 if (drsFileName != "")
1765 {
1766 drsCalib16 = new int16_t[1440*1024];
1767 float* drsCalibFloat = NULL;
1768 try
1769 {
1770 drsCalibFloat = reinterpret_cast<float*>(drsFile->SetPtrAddress("BaselineMean"));
1771 }
1772 catch (...)
1773 {
1774 cout << "Could not find column BaselineMean in drs calibration file " << drsFileName << ". Aborting" << endl;
1775 return -1;
1776 }
1777
1778 //read the calibration and calculate its integer value
1779 drsFile->GetNextRow();
1780 for (uint32_t i=0;i<1440*1024;i++)
1781 drsCalib16[i] = (int16_t)(drsCalibFloat[i]*4096.f/2000.f);
1782
1783
1784 //get the start cells offsets
1785 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1786 if (it->first == "StartCellData")
1787 {
1788 startCellOffset = it->second.offset;
1789 if (it->second.type != 'I')
1790 {
1791 cout << "Wrong type for the StartCellData Column: " << it->second.type << " instead of I expected"<< endl;
1792 return -1;
1793 }
1794 }
1795
1796 if (startCellOffset == -1)
1797 {
1798 cout << "WARNING: Could not find StartCellData in input file " << fileNameIn << ". Doing it uncalibrated"<< endl;
1799 }
1800 else
1801 {
1802 //assign it to the ouput file
1803 outFile.setDrsCalib(drsCalib16);
1804 }
1805 }
1806
1807 /************************************************************************************
1808 * Done configuring compression. Do the real job now !
1809 ************************************************************************************/
1810
1811 if (displayText) cout << "Converting file..." << endl;
1812
1813 int numSlices = -1;
1814 int32_t dataOffset = -1;
1815
1816 //Get the pointer to the column that must be drs-calibrated
1817 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1818 if (it->first == "Data")
1819 {
1820 numSlices = it->second.num;
1821 dataOffset = it->second.offset;
1822 }
1823 if (numSlices % 1440 != 0)
1824 {
1825 cout << "seems like the number of samples is not a multiple of 1440. Aborting." << endl;
1826 return -1;
1827 }
1828 if (dataOffset == -1)
1829 {
1830 cout << "Could not find the column Data in the input file. Aborting." << endl;
1831 return -1;
1832 }
1833
1834 numSlices /= 1440;
1835
1836 //set pointers to the readout data to later be able to gather it to "buffer".
1837 vector<void*> readPointers;
1838 vector<int32_t> readOffsets;
1839 vector<int32_t> readElemSize;
1840 vector<int32_t> readNumElems;
1841 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1842 {
1843 readPointers.push_back(inFile.SetPtrAddress(it->first));
1844 readOffsets.push_back(it->second.offset);
1845 readElemSize.push_back(it->second.size);
1846 readNumElems.push_back(it->second.num);
1847 }
1848
1849 Checksum rawsum;
1850 //Convert each row one after the other
1851 for (uint32_t i=0;i<inFile.GetNumRows();i++)
1852 {
1853 if (displayText) cout << "\r Row " << i+1 << flush;
1854 inFile.GetNextRow();
1855 //copy from inFile internal structures to buffer
1856 int32_t count=0;
1857 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end();it++)
1858 {
1859 memcpy(&buffer[readOffsets[count]], readPointers[count], readElemSize[count]*readNumElems[count]);
1860 count++;
1861 }
1862
1863 char* checkSumPointer = buffer;
1864 const int chkOffset = (i*totalRowWidth)%4;
1865 checkSumPointer -= chkOffset;
1866 uint32_t sizeToChecksum = totalRowWidth + chkOffset;
1867 if (sizeToChecksum%4 != 0)
1868 {
1869 int32_t extraBytes = 4 - (sizeToChecksum%4);
1870 memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
1871 sizeToChecksum += extraBytes;
1872 }
1873 rawsum.add(checkSumPointer, sizeToChecksum, false);
1874
1875 if (startCellOffset != -1)
1876 {//drs calibrate this data
1877 for (int j=0;j<1440;j++)
1878 {
1879 const int thisStartCell = reinterpret_cast<int16_t*>(&buffer[startCellOffset])[j];
1880 if (thisStartCell < 0) continue;
1881 for (int k=0;k<numSlices;k++)
1882 reinterpret_cast<int16_t*>(&buffer[dataOffset])[numSlices*j + k] -= drsCalib16[1024*j + (thisStartCell+k)%1024];
1883 }
1884 }
1885 outFile.writeBinaryRow(buffer);
1886 };
1887 ostringstream strSum;
1888 strSum << rawsum.val();
1889 outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("RAWSUM", strSum.str(), "Checksum of raw littlen endian data"));
1890
1891 //Get table name for later use in case the compressed file is to be verified
1892 string tableName = inFile.GetStr("EXTNAME");
1893
1894 if (displayText) cout << endl << "Done. Flushing output file..." << endl;
1895
1896 inFile.close();
1897 if (!outFile.close())
1898 {
1899 cout << "Something went wrong while writing the catalog: negative index" << endl;
1900 return false;
1901 }
1902
1903 if (drsCalib16 != NULL)
1904 delete[] drsCalib16;
1905
1906 if (displayText) cout << "Done." << endl;
1907
1908 /************************************************************************************
1909 * Actual job done. Should we verify what we did ?
1910 ************************************************************************************/
1911
1912 if (verifyDataPlease)
1913 {
1914 if (displayText) cout << "Now verify data..." << endl;
1915 }
1916 else
1917 return 0;
1918
1919 //get a compressed reader
1920//TEMP try to copy the file too
1921// string copyName("/gpfs/scratch/fact/etienne/copyFile.fz");
1922// string copyName(fileNameOut+".copy");
1923 string copyName("");
1924 factfits verifyFile(fileNameOut, copyName, tableName, false);
1925
1926 //and the header of the compressed file
1927 const fits::Table::Keys& header2 = verifyFile.GetKeys();
1928
1929 //get a non-compressed writer
1930 ofits reconstructedFile;
1931
1932 //figure out its name: /dev/null unless otherwise specified
1933 string reconstructedName = fileNameOut+".recons";
1934 reconstructedName = "/dev/null";
1935 reconstructedFile.open(reconstructedName.c_str(), false);
1936
1937 //reconstruct the original columns from the compressed file.
1938 string origChecksumStr;
1939 string origDatasum;
1940
1941 //reset tablename value so that it is re-read from compressed table's header
1942 tableName = "";
1943
1944 /************************************************************************************
1945 * Reconstruction setup done. Rebuild original header
1946 ************************************************************************************/
1947
1948 //re-tranlate the keys
1949 for (fits::Table::Keys::const_iterator it=header2.begin(); it!= header2.end(); it++)
1950 {
1951 string k = it->first;
1952 if (k == "XTENSION" || k == "BITPIX" || k == "PCOUNT" || k == "GCOUNT" ||
1953 k == "TFIELDS" || k == "ZTABLE" || k == "ZNAXIS1" || k == "ZNAXIS2" ||
1954 k == "ZHEAPPTR" || k == "ZPCOUNT" || k == "ZTILELEN" || k == "THEAP" ||
1955 k == "CHECKSUM" || k == "DATASUM" || k == "FCTCPVER")
1956 {
1957 continue;
1958 }
1959
1960 if (k == "ZCHKSUM")
1961 {
1962 reconstructedFile.SetKeyComment("CHECKSUM", it->second.comment);
1963 origChecksumStr = it->second.value;
1964 continue;
1965 }
1966 if (k == "RAWSUM")
1967 {
1968 continue;
1969 }
1970
1971 if (k == "ZDTASUM")
1972 {
1973 reconstructedFile.SetKeyComment("DATASUM", it->second.comment);
1974 origDatasum = it->second.value;
1975 continue;
1976 }
1977
1978 if (k == "EXTNAME")
1979 {
1980 tableName = it->second.value;
1981 }
1982
1983 k = k.substr(0,5);
1984
1985 if (k == "TTYPE")
1986 {//we have an original column name here.
1987 //manually deal with these in order to preserve the ordering (easier than re-constructing yet another list on the fly)
1988 continue;
1989 }
1990
1991 if (k == "TFORM" || k == "NAXIS" || k == "ZCTYP" )
1992 {
1993 continue;
1994 }
1995
1996 if (k == "ZFORM" || k == "ZTYPE")
1997 {
1998 string tmpKey = it->second.fitsString;
1999 tmpKey[0] = 'T';
2000 reconstructedFile.SetKeyFromFitsString(tmpKey);
2001 continue;
2002 }
2003
2004 reconstructedFile.SetKeyFromFitsString(it->second.fitsString);
2005 }
2006
2007 if (tableName == "")
2008 {
2009 cout << "Error: table name from file " << fileNameOut << " could not be found. Aborting" << endl;
2010 return -1;
2011 }
2012
2013 //Restore the original columns
2014 for (uint32_t numCol=1; numCol<10000; numCol++)
2015 {
2016 ostringstream str;
2017 str << numCol;
2018 if (!verifyFile.HasKey("TTYPE"+str.str())) break;
2019
2020 string ttype = verifyFile.GetStr("TTYPE"+str.str());
2021 string tform = verifyFile.GetStr("ZFORM"+str.str());
2022 char type = tform[tform.size()-1];
2023 string number = tform.substr(0, tform.size()-1);
2024 int numElems = atoi(number.c_str());
2025
2026 if (number == "") numElems=1;
2027
2028 reconstructedFile.AddColumn(numElems, type, ttype, "", "", false);
2029 }
2030
2031 reconstructedFile.WriteTableHeader(tableName.c_str());
2032
2033 /************************************************************************************
2034 * Original header restored. Do the data
2035 ************************************************************************************/
2036
2037 //set pointers to the readout data to later be able to gather it to "buffer".
2038 readPointers.clear();
2039 readOffsets.clear();
2040 readElemSize.clear();
2041 readNumElems.clear();
2042 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
2043 {
2044 readPointers.push_back(verifyFile.SetPtrAddress(it->first));
2045 readOffsets.push_back(it->second.offset);
2046 readElemSize.push_back(it->second.size);
2047 readNumElems.push_back(it->second.num);
2048 }
2049
2050 //do the actual reconstruction work
2051 uint32_t i=1;
2052 while (i<=verifyFile.GetNumRows() && verifyFile.GetNextRow())
2053 {
2054 int count=0;
2055 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end();it++)
2056 {
2057 memcpy(&buffer[readOffsets[count]], readPointers[count], readElemSize[count]*readNumElems[count]);
2058 count++;
2059 }
2060 if (displayText) cout << "\r Row " << i << flush;
2061 reconstructedFile.WriteRow(buffer, rowWidth);
2062 i++;
2063 }
2064
2065 if (displayText) cout << endl;
2066
2067 //close reconstruction input and output
2068// Do NOT close the verify file, otherwise data cannot be flushed to copy file
2069// verifyFile.close();
2070 if (!verifyFile.IsFileOk())
2071 cout << "ERROR: file checksums seems wrong" << endl;
2072
2073 reconstructedFile.close();
2074
2075 //get original and reconstructed checksum and datasum
2076 std::pair<string, int> origChecksum = make_pair(origChecksumStr, atoi(origDatasum.c_str()));
2077 std::pair<string, int> newChecksum = reconstructedFile.GetChecksumData();
2078
2079 //verify that no mistake was made
2080 if (origChecksum.second != newChecksum.second)
2081 {
2082 cout << "ERROR: datasums are NOT identical: " << (uint32_t)(origChecksum.second) << " vs " << (uint32_t)(newChecksum.second) << endl;
2083 return -1;
2084 }
2085 if (origChecksum.first != newChecksum.first)
2086 {
2087 cout << "WARNING: checksums are NOT Identical: " << origChecksum.first << " vs " << newChecksum.first << endl;
2088 }
2089 else
2090 {
2091 if (true) cout << "Ok" << endl;
2092 }
2093
2094 buffer = buffer-4;
2095 delete[] buffer;
2096 return 0;
2097}
2098
Note: See TracBrowser for help on using the repository browser.