source: trunk/FACT++/src/fitsCompressor.cc@ 16805

Last change on this file since 16805 was 16805, checked in by lyard, 11 years ago
Added TILE marker and size between tile data
File size: 80.6 KB
Line 
1/*
2 * fitsCompressor.cc
3 *
4 * Created on: May 7, 2013
5 * Author: lyard
6 */
7
8
9#include "Configuration.h"
10#include "../externals/factfits.h"
11#include "../externals/ofits.h"
12#include "../externals/checksum.h"
13
14#include <map>
15#include <fstream>
16#include <sstream>
17#include <iostream>
18
19
20using namespace std;
21
22class CompressedFitsFile
23{
24public:
25 class HeaderEntry
26 {
27 public:
28 /**
29 * Default constructor
30 */
31 HeaderEntry(): _key(""),
32 _value(""),
33 _comment(""),
34 _fitsString("")
35 {
36 }
37
38 /**
39 * Regular constructor.
40 * @param key the name of the keyword entry
41 * @param value its value
42 * @param comment an optionnal comment to be placed after the value
43 */
44 template<typename T>
45 HeaderEntry(const string& k,
46 const T& val,
47 const string& comm) : _key(k),
48 _value(""),
49 _comment(comm),
50 _fitsString("")
51 {
52 setValue(val);
53 }
54
55 /**
56 * From fits.h
57 */
58 string Trim(const string &str, char c=' ')
59 {
60 // Trim Both leading and trailing spaces
61 const size_t pstart = str.find_first_not_of(c); // Find the first character position after excluding leading blank spaces
62 const size_t pend = str.find_last_not_of(c); // Find the first character position from reverse af
63
64 // if all spaces or empty return an empty string
65 if (string::npos==pstart || string::npos==pend)
66 return string();
67
68 return str.substr(pstart, pend-pstart+1);
69 }
70 /**
71 * Constructor from the original fits entry
72 */
73 HeaderEntry(const string& line)
74 {
75 _fitsString = line.data();
76
77 //parse the line
78 _key = Trim(line.substr(0,8));
79 //COMMENT and/or HISTORY values
80 if (line.substr(8,2)!= "= ")
81 {
82 _value = "";
83 _comment = Trim(line.substr(10));
84 return;
85 }
86 string next=line.substr(10);
87 const size_t slash = next.find_first_of('/');
88 _value = Trim(Trim(Trim(next.substr(0, slash)), '\''));
89 _comment = Trim(next.substr(slash+1));
90 }
91 /**
92 * Alternative constroctor from the fits entry
93 */
94 HeaderEntry(const vector<char>& lineVec)
95 {
96 HeaderEntry(string(lineVec.data()));
97 }
98 /**
99 * Destructor
100 */
101 virtual ~HeaderEntry(){}
102
103 const string& value() const {return _value;}
104 const string& key() const {return _key;}
105 const string& comment() const {return _comment;}
106 const string& fitsString() const {return _fitsString;}
107
108 /**
109 * Set a keyword value.
110 * @param value The value to be set
111 * @param update whether the value already exist or not. To be modified soon.
112 */
113 template<typename T>
114 void setValue(const T& val)
115 {
116 ostringstream str;
117 str << val;
118 _value = str.str();
119 buildFitsString();
120 };
121 /**
122 * Set the comment for a given entry
123 */
124 void setComment(const string& comm)
125 {
126 _comment = comm;
127 buildFitsString();
128 }
129
130 private:
131 /**
132 * Construct the FITS header string from the key, value and comment
133 */
134 void buildFitsString()
135 {
136 ostringstream str;
137 unsigned int totSize = 0;
138
139 // Tuncate the key if required
140 if (_key.length() > 8)
141 {
142 str << _key.substr(0, 8);
143 totSize += 8;
144 }
145 else
146 {
147 str << _key;
148 totSize += _key.length();
149 }
150
151 // Append space if key is less than 8 chars long
152 for (int i=totSize; i<8;i++)
153 {
154 str << " ";
155 totSize++;
156 }
157
158 // Add separator
159 str << "= ";
160 totSize += 2;
161
162 // Format value
163 if (_value.length() < 20)
164 for (;totSize<30-_value.length();totSize++)
165 str << " ";
166
167 if (_value.length() > 70)
168 {
169 str << _value.substr(0,70);
170 totSize += 70;
171 }
172 else
173 {
174 str << _value;
175 totSize += _value.size();
176 }
177
178 // If there is space remaining, add comment area
179 if (totSize < 77)
180 {
181 str << " / ";
182 totSize += 3;
183 if (totSize < 80)
184 {
185 unsigned int commentSize = 80 - totSize;
186 if (_comment.length() > commentSize)
187 {
188 str << _comment.substr(0,commentSize);
189 totSize += commentSize;
190 }
191 else
192 {
193 str << _comment;
194 totSize += _comment.length();
195 }
196 }
197 }
198
199 // If there is yet some free space, fill up the entry with spaces
200 for (int i=totSize; i<80;i++)
201 str << " ";
202
203 _fitsString = str.str();
204
205 // Check for correct completion
206 if (_fitsString.length() != 80)
207 cout << "Error |" << _fitsString << "| is not of length 80" << endl;
208 }
209
210 string _key; ///< the key (name) of the header entry
211 string _value; ///< the value of the header entry
212 string _comment; ///< the comment associated to the header entry
213 string _fitsString; ///< the string that will be written to the fits file
214 };
215
216 /**
217 * Supported compressions
218 */
219 typedef enum
220 {
221 UNCOMPRESSED,
222 SMOOTHMAN
223 } FitsCompression;
224
225 /**
226 * Columns class
227 */
228 class ColumnEntry
229 {
230 public:
231 /**
232 * Default constructor
233 */
234 ColumnEntry();
235
236 /**
237 * Default destructor
238 */
239 virtual ~ColumnEntry(){}
240
241 /**
242 * Constructor from values
243 * @param n the column name
244 * @param t the column type
245 * @param numof the number of entries in the column
246 * @param comp the compression type for this column
247 */
248 ColumnEntry(const string& n,
249 char t,
250 int numOf,
251 uint32_t comp) : _name(n),
252 _num(numOf),
253 _typeSize(0),
254 _offset(0),
255 _type(t),
256 _description(""),
257 _compression(comp)
258 {
259 switch (t)
260 {
261 case 'L':
262 case 'A':
263 case 'B': _typeSize = 1; break;
264 case 'I': _typeSize = 2; break;
265 case 'J':
266 case 'E': _typeSize = 4; break;
267 case 'K':
268 case 'D': _typeSize = 8; break;
269 default:
270 cout << "Error: typename " << t << " missing in the current implementation" << endl;
271 };
272
273 ostringstream str;
274 str << "data format of field: ";
275
276 switch (t)
277 {
278 case 'L': str << "1-byte BOOL"; break;
279 case 'A': str << "1-byte CHAR"; break;
280 case 'B': str << "BYTE"; break;
281 case 'I': str << "2-byte INTEGER"; break;
282 case 'J': str << "4-byte INTEGER"; break;
283 case 'K': str << "8-byte INTEGER"; break;
284 case 'E': str << "4-byte FLOAT"; break;
285 case 'D': str << "8-byte FLOAT"; break;
286 }
287
288 _description = str.str();
289 }
290
291 const string& name() const { return _name;};
292 int width() const { return _num*_typeSize;};
293 int offset() const { return _offset; };
294 int numElems() const { return _num; };
295 int sizeOfElems() const { return _typeSize;};
296 void setOffset(int off) { _offset = off;};
297 char type() const { return _type;};
298 string getDescription() const { return _description;}
299 uint32_t getCompression() const { return _compression;}
300
301 string getCompressionString() const
302 {
303 switch (_compression)
304 {
305 case UNCOMPRESSED: return "RAW";
306 case SMOOTHMAN: return "SMOO";
307 };
308 return "UNKNOWN";
309 }
310
311 private:
312
313 string _name; ///< name of the column
314 int _num; ///< number of elements contained in one row of this column
315 int _typeSize; ///< the number of bytes taken by one element
316 int _offset; ///< the offset of the column, in bytes, from the beginning of one row
317 char _type; ///< the type of the column, as specified by the fits documentation
318 string _description; ///< a description for the column. It will be placed in the header
319 uint32_t _compression; ///< the compression of the column. Only looked for if used by the compressed writer
320 };
321
322 public:
323 ///@brief default constructor. Assigns a default number of rows and tiles
324 CompressedFitsFile(uint32_t numTiles=100, uint32_t numRowsPerTile=100);
325
326 ///@brief default destructor
327 virtual ~CompressedFitsFile();
328
329 ///@brief get the header of the file
330 vector<HeaderEntry>& getHeaderEntries() { return _header;}
331
332 protected:
333 ///@brief protected function to allocate the intermediate buffers
334 bool reallocateBuffers();
335
336 //FITS related stuff
337 vector<HeaderEntry> _header; ///< Header keys
338 vector<ColumnEntry> _columns; ///< Columns in the file
339 uint32_t _numTiles; ///< Number of tiles (i.e. groups of rows)
340 uint32_t _numRowsPerTile; ///< Number of rows per tile
341 uint32_t _totalNumRows; ///< Total number of raws
342 uint32_t _rowWidth; ///< Total number of bytes in one row
343 bool _headerFlushed; ///< Flag telling whether the header record is synchronized with the data on disk
344 char* _buffer; ///< Memory buffer to store rows while they are not compressed
345 Checksum _checksum; ///< Checksum for asserting the consistency of the data
346 fstream _file; ///< The actual file streamer for accessing disk data
347
348 //compression related stuff
349 typedef pair<int64_t, int64_t> CatalogEntry;
350 typedef vector<CatalogEntry> CatalogRow;
351 typedef vector<CatalogRow> CatalogType;
352 CatalogType _catalog; ///< Catalog, i.e. the main table that points to the compressed data.
353 uint64_t _heapPtr; ///< the address in the file of the heap area
354 vector<char*> _transposedBuffer; ///< Memory buffer to store rows while they are transposed
355 vector<char*> _compressedBuffer; ///< Memory buffer to store rows while they are compressed
356
357 //thread related stuff
358 uint32_t _numThreads; ///< The number of threads that will be used to compress
359 uint32_t _threadIndex; ///< A variable to assign threads indices
360 vector<pthread_t> _thread; ///< The thread handler of the compressor
361 vector<uint32_t> _threadNumRows; ///< Total number of rows for thread to compress
362 vector<uint32_t> _threadStatus; ///< Flag telling whether the buffer to be transposed (and compressed) is full or empty
363
364 //thread states. Not all used, but they do not hurt
365 static const uint32_t _THREAD_WAIT_; ///< Thread doing nothing
366 static const uint32_t _THREAD_COMPRESS_; ///< Thread working, compressing
367 static const uint32_t _THREAD_DECOMPRESS_; ///< Thread working, decompressing
368 static const uint32_t _THREAD_WRITE_; ///< Thread writing data to disk
369 static const uint32_t _THREAD_READ_; ///< Thread reading data from disk
370 static const uint32_t _THREAD_EXIT_; ///< Thread exiting
371
372 static HeaderEntry _dummyHeaderEntry; ///< Dummy entry for returning if requested on is not found
373};
374
375class CompressedFitsWriter : public CompressedFitsFile
376{
377 public:
378 ///@brief Default constructor. 100 tiles of 100 rows each are assigned by default
379 CompressedFitsWriter(uint32_t numTiles=100, uint32_t numRowsPerTile=100);
380
381 ///@brief default destructor
382 virtual ~CompressedFitsWriter();
383
384 ///@brief add one column to the file
385 bool addColumn(const ColumnEntry& column);
386
387 ///@brief sets a given header key
388 bool setHeaderKey(const HeaderEntry&);
389
390 ///@brief open a new fits file
391 bool open(const string& fileName, const string& tableName="Data");
392
393 ///@brief close the opened file
394 bool close();
395
396 ///@brief write one row of data, already placed in bufferToWrite. Does the byte-swapping
397 bool writeBinaryRow(const char* bufferToWrite);
398
399 ///@brief assign a given (already loaded) drs calibration
400 void setDrsCalib(int16_t* data);
401
402 ///@brief set the number of worker threads compressing the data.
403 bool setNumWorkingThreads(uint32_t num);
404
405 private:
406 ///@brief compresses one buffer of data, the one given by threadIndex
407 uint64_t compressBuffer(uint32_t threadIndex);
408
409 ///@brief writes an already compressed buffer to disk
410 bool writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite);
411
412 ///@brief add the header checksum to the datasum
413 void addHeaderChecksum(Checksum& checksum);
414
415 ///@brief write the header. If closingFile is set to true, checksum is calculated
416 void writeHeader(bool closingFile = false);
417
418 ///@brief write the compressed data catalog. If closingFile is set to true, checksum is calculated
419 void writeCatalog(bool closingFile=false);
420
421 /// FIXME this was a bad idea. Move everything to the regular header
422 vector<HeaderEntry> _defaultHeader;
423
424 /// the main function compressing the data
425 static void* threadFunction(void* context);
426
427 /// Write the drs calibration to disk, if any
428 void writeDrsCalib();
429
430 /// Copy and transpose (or semi-transpose) one tile of data
431 void copyTransposeTile(uint32_t index);
432
433 /// Specific compression functions
434 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
435 uint32_t compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
436 uint32_t compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
437
438 int32_t _checkOffset; ///< offset to the data pointer to calculate the checksum
439 int16_t* _drsCalibData; ///< array of the Drs baseline mean
440 int32_t _threadLooper; ///< Which thread will deal with the upcoming bunch of data ?
441 pthread_mutex_t _mutex; ///< mutex for compressing threads
442
443
444 static string _emptyBlock; ///< an empty block to be apened at the end of a file so that its length is a multiple of 2880
445 static string _fitsHeader; ///< the default header to be written in every fits file
446};
447
448const uint32_t CompressedFitsFile::_THREAD_WAIT_ = 0;
449const uint32_t CompressedFitsFile::_THREAD_COMPRESS_ = 1;
450const uint32_t CompressedFitsFile::_THREAD_DECOMPRESS_= 2;
451const uint32_t CompressedFitsFile::_THREAD_WRITE_ = 3;
452const uint32_t CompressedFitsFile::_THREAD_READ_ = 4;
453const uint32_t CompressedFitsFile::_THREAD_EXIT_ = 5;
454
455#define COMPRESSED_FLAG 0x1
456#define UNCOMPRESSED_FLAG 0x0
457
458typedef struct TileHeader
459{
460 char id[4];
461 uint64_t size;
462 TileHeader()
463 {
464 id[0]='T';
465 id[1]='I';
466 id[2]='L';
467 id[3]='E';
468 size=0;
469 };
470} __attribute__((__packed__)) TileHeader;
471
472template<>
473void CompressedFitsFile::HeaderEntry::setValue(const string& v)
474{
475 string val = v;
476 if (val.size() > 2 && val[0] == '\'')
477 {
478 size_t pos = val.find_last_of("'");
479 if (pos != string::npos && pos != 0)
480 val = val.substr(1, pos-1);
481 }
482 ostringstream str;
483
484 str << "'" << val << "'";
485 for (int i=str.str().length(); i<20;i++)
486 str << " ";
487 _value = str.str();
488 buildFitsString();
489}
490
491/**
492 * Default header to be written in all fits files
493 */
494string CompressedFitsWriter::_fitsHeader = "SIMPLE = T / file does conform to FITS standard "
495 "BITPIX = 8 / number of bits per data pixel "
496 "NAXIS = 0 / number of data axes "
497 "EXTEND = T / FITS dataset may contain extensions "
498 "CHECKSUM= '4AcB48bA4AbA45bA' / Checksum for the whole HDU "
499 "DATASUM = ' 0' / Checksum for the data block "
500 "COMMENT FITS (Flexible Image Transport System) format is defined in 'Astronomy"
501 "COMMENT and Astrophysics', volume 376, page 359; bibcode: 2001A&A...376..359H "
502 "END "
503 " "
504 " "
505 " "
506 " "
507 " "
508 " "
509 " "
510 " "
511 " "
512 " "
513 " "
514 " "
515 " "
516 " "
517 " "
518 " "
519 " "
520 " "
521 " "
522 " "
523 " "
524 " "
525 " "
526 " "
527 " "
528 " "
529 " ";
530
531/**
532 * Empty block to be appenned at the end of files, so that the length matches multiple of 2880 bytes
533 *
534 */
535string CompressedFitsWriter::_emptyBlock = " "
536 " "
537 " "
538 " "
539 " "
540 " "
541 " "
542 " "
543 " "
544 " "
545 " "
546 " "
547 " "
548 " "
549 " "
550 " "
551 " "
552 " "
553 " "
554 " "
555 " "
556 " "
557 " "
558 " "
559 " "
560 " "
561 " "
562 " "
563 " "
564 " "
565 " "
566 " "
567 " "
568 " "
569 " "
570 " ";
571
572CompressedFitsFile::HeaderEntry CompressedFitsFile::_dummyHeaderEntry;
573/****************************************************************
574 * SUPER CLASS DEFAULT CONSTRUCTOR
575 ****************************************************************/
576CompressedFitsFile::CompressedFitsFile(uint32_t numTiles,
577 uint32_t numRowsPerTile) : _header(),
578 _columns(),
579 _numTiles(numTiles),
580 _numRowsPerTile(numRowsPerTile),
581 _totalNumRows(0),
582 _rowWidth(0),
583 _headerFlushed(false),
584 _buffer(NULL),
585 _checksum(0),
586 _heapPtr(0),
587 _transposedBuffer(1),
588 _compressedBuffer(1),
589 _numThreads(1),
590 _threadIndex(0),
591 _thread(1),
592 _threadNumRows(1),
593 _threadStatus(1)
594{
595 _catalog.resize(_numTiles);
596 _transposedBuffer[0] = NULL;
597 _compressedBuffer[0] = NULL;
598 _threadStatus[0] = _THREAD_WAIT_;
599 _threadNumRows[0] = 0;
600}
601
602/****************************************************************
603 * SUPER CLASS DEFAULT DESTRUCTOR
604 ****************************************************************/
605CompressedFitsFile::~CompressedFitsFile()
606{
607 if (_buffer != NULL)
608 {
609 delete[] _buffer;
610 _buffer = NULL;
611 for (uint32_t i=0;i<_numThreads;i++)
612 {
613 _compressedBuffer[i] = _compressedBuffer[i]-4;
614 delete[] _transposedBuffer[i];
615 delete[] _compressedBuffer[i];
616 _transposedBuffer[i] = NULL;
617 _compressedBuffer[i] = NULL;
618 }
619 }
620 if (_file.is_open())
621 _file.close();
622}
623
624/****************************************************************
625 * REALLOCATE BUFFER
626 ****************************************************************/
627bool CompressedFitsFile::reallocateBuffers()
628{
629 if (_buffer)
630 {
631 delete[] _buffer;
632 for (uint32_t i=0;i<_compressedBuffer.size();i++)
633 {
634 _compressedBuffer[i] = _compressedBuffer[i]-4;
635 delete[] _transposedBuffer[i];
636 delete[] _compressedBuffer[i];
637 }
638 }
639 _buffer = new char[_rowWidth*_numRowsPerTile];
640 if (_buffer == NULL) return false;
641 if (_compressedBuffer.size() != _numThreads)
642 {
643 _transposedBuffer.resize(_numThreads);
644 _compressedBuffer.resize(_numThreads);
645 }
646 for (uint32_t i=0;i<_numThreads;i++)
647 {
648 _transposedBuffer[i] = new char[_rowWidth*_numRowsPerTile];
649 _compressedBuffer[i] = new char[_rowWidth*_numRowsPerTile + _columns.size() + sizeof(TileHeader)]; //use a bit more memory for compression flags
650 if (_transposedBuffer[i] == NULL || _compressedBuffer[i] == NULL)
651 return false;
652 //shift the compressed buffer by 4 bytes, for checksum calculation
653 memset(_compressedBuffer[i], 0, 4);
654 _compressedBuffer[i] = _compressedBuffer[i]+4;
655 //initialize the tile header
656 TileHeader tileHeader;
657 memcpy(_compressedBuffer[i], &tileHeader, sizeof(TileHeader));
658 }
659 return true;
660}
661
662/****************************************************************
663 * DEFAULT WRITER CONSTRUCTOR
664 ****************************************************************/
665CompressedFitsWriter::CompressedFitsWriter(uint32_t numTiles,
666 uint32_t numRowsPerTile) : CompressedFitsFile(numTiles, numRowsPerTile),
667 _checkOffset(0),
668 _drsCalibData(NULL),
669 _threadLooper(0)
670{
671 _defaultHeader.push_back(HeaderEntry("XTENSION", "'BINTABLE' ", "binary table extension"));
672 _defaultHeader.push_back(HeaderEntry("BITPIX", 8, "8-bit bytes"));
673 _defaultHeader.push_back(HeaderEntry("NAXIS", 2, "2-dimensional binary table"));
674 _defaultHeader.push_back(HeaderEntry("NAXIS1", _rowWidth, "width of table in bytes"));
675 _defaultHeader.push_back(HeaderEntry("NAXIS2", numTiles, "num of rows in table"));
676 _defaultHeader.push_back(HeaderEntry("PCOUNT", 0, "size of special data area"));
677 _defaultHeader.push_back(HeaderEntry("GCOUNT", 1, "one data group (required keyword)"));
678 _defaultHeader.push_back(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
679 _defaultHeader.push_back(HeaderEntry("CHECKSUM", "'0000000000000000' ", "Checksum for the whole HDU"));
680 _defaultHeader.push_back(HeaderEntry("DATASUM", " 0", "Checksum for the data block"));
681 //compression stuff
682 _defaultHeader.push_back(HeaderEntry("ZTABLE", "T", "Table is compressed"));
683 _defaultHeader.push_back(HeaderEntry("ZNAXIS1", 0, "Width of uncompressed rows"));
684 _defaultHeader.push_back(HeaderEntry("ZNAXIS2", 0, "Number of uncompressed rows"));
685 _defaultHeader.push_back(HeaderEntry("ZPCOUNT", 0, ""));
686 _defaultHeader.push_back(HeaderEntry("ZHEAPPTR", 0, ""));
687 _defaultHeader.push_back(HeaderEntry("ZTILELEN", numRowsPerTile, "Number of rows per tile"));
688 _defaultHeader.push_back(HeaderEntry("THEAP", 0, ""));
689
690 pthread_mutex_init(&_mutex, NULL);
691}
692
693/****************************************************************
694 * DEFAULT DESTRUCTOR
695 ****************************************************************/
696CompressedFitsWriter::~CompressedFitsWriter()
697{
698 pthread_mutex_destroy(&_mutex);
699}
700
701/****************************************************************
702 * SET THE POINTER TO THE DRS CALIBRATION
703 ****************************************************************/
704void CompressedFitsWriter::setDrsCalib(int16_t* data)
705{
706 _drsCalibData = data;
707}
708
709/****************************************************************
710 * SET NUM WORKING THREADS
711 ****************************************************************/
712bool CompressedFitsWriter::setNumWorkingThreads(uint32_t num)
713{
714 if (_file.is_open())
715 return false;
716 if (num < 1 || num > 64)
717 {
718 cout << "ERROR: num threads must be between 1 and 64. Ignoring" << endl;
719 return false;
720 }
721 _numThreads = num;
722 _transposedBuffer[0] = NULL;
723 _compressedBuffer[0] = NULL;
724 _threadStatus.resize(num);
725 _thread.resize(num);
726 _threadNumRows.resize(num);
727 for (uint32_t i=0;i<num;i++)
728 {
729 _threadNumRows[i] = 0;
730 _threadStatus[i] = _THREAD_WAIT_;
731 }
732 return reallocateBuffers();
733}
734
735/****************************************************************
736 * WRITE DRS CALIBRATION TO FILE
737 ****************************************************************/
738void CompressedFitsWriter::writeDrsCalib()
739{
740 //if file was not loaded, ignore
741 if (_drsCalibData == NULL)
742 return;
743 uint64_t whereDidIStart = _file.tellp();
744 vector<HeaderEntry> header;
745 header.push_back(HeaderEntry("XTENSION", "'BINTABLE' ", "binary table extension"));
746 header.push_back(HeaderEntry("BITPIX" , 8 , "8-bit bytes"));
747 header.push_back(HeaderEntry("NAXIS" , 2 , "2-dimensional binary table"));
748 header.push_back(HeaderEntry("NAXIS1" , 1024*1440*2 , "width of table in bytes"));
749 header.push_back(HeaderEntry("NAXIS2" , 1 , "number of rows in table"));
750 header.push_back(HeaderEntry("PCOUNT" , 0 , "size of special data area"));
751 header.push_back(HeaderEntry("GCOUNT" , 1 , "one data group (required keyword)"));
752 header.push_back(HeaderEntry("TFIELDS" , 1 , "number of fields in each row"));
753 header.push_back(HeaderEntry("CHECKSUM", "'0000000000000000' ", "Checksum for the whole HDU"));
754 header.push_back(HeaderEntry("DATASUM" , " 0" , "Checksum for the data block"));
755 header.push_back(HeaderEntry("EXTNAME" , "'ZDrsCellOffsets' ", "name of this binary table extension"));
756 header.push_back(HeaderEntry("TTYPE1" , "'OffsetCalibration' ", "label for field 1"));
757 header.push_back(HeaderEntry("TFORM1" , "'1474560I' ", "data format of field: 2-byte INTEGER"));
758
759 for (uint32_t i=0;i<header.size();i++)
760 _file.write(header[i].fitsString().c_str(), 80);
761 //End the header
762 _file.write("END ", 80);
763 long here = _file.tellp();
764 if (here%2880)
765 _file.write(_emptyBlock.c_str(), 2880 - here%2880);
766 //now write the data itself
767 int16_t* swappedBytes = new int16_t[1024];
768 Checksum checksum;
769 for (int32_t i=0;i<1440;i++)
770 {
771 memcpy(swappedBytes, &(_drsCalibData[i*1024]), 2048);
772 for (int32_t j=0;j<2048;j+=2)
773 {
774 int8_t inter;
775 inter = reinterpret_cast<int8_t*>(swappedBytes)[j];
776 reinterpret_cast<int8_t*>(swappedBytes)[j] = reinterpret_cast<int8_t*>(swappedBytes)[j+1];
777 reinterpret_cast<int8_t*>(swappedBytes)[j+1] = inter;
778 }
779 _file.write(reinterpret_cast<char*>(swappedBytes), 2048);
780 checksum.add(reinterpret_cast<char*>(swappedBytes), 2048);
781 }
782 uint64_t whereDidIStop = _file.tellp();
783 delete[] swappedBytes;
784 //No need to pad the data, as (1440*1024*2)%2880==0
785
786 //calculate the checksum from the header
787 ostringstream str;
788 str << checksum.val();
789 header[9] = HeaderEntry("DATASUM", str.str(), "Checksum for the data block");
790 for (vector<HeaderEntry>::iterator it=header.begin();it!=header.end(); it++)
791 checksum.add(it->fitsString().c_str(), 80);
792 string end("END ");
793 string space(" ");
794 checksum.add(end.c_str(), 80);
795 int headerRowsLeft = 36 - (header.size() + 1)%36;
796 for (int i=0;i<headerRowsLeft;i++)
797 checksum.add(space.c_str(), 80);
798 //udpate the checksum keyword
799 header[8] = HeaderEntry("CHECKSUM", checksum.str(), "Checksum for the whole HDU");
800 //and eventually re-write the header data
801 _file.seekp(whereDidIStart);
802 for (uint32_t i=0;i<header.size();i++)
803 _file.write(header[i].fitsString().c_str(), 80);
804 _file.seekp(whereDidIStop);
805}
806
807/****************************************************************
808 * ADD COLUMN
809 ****************************************************************/
810bool CompressedFitsWriter::addColumn(const ColumnEntry& column)
811{
812 if (_totalNumRows != 0)
813 {
814 cout << "Error: cannot add new columns once first row has been written" << endl;
815 return false;
816 }
817 for (vector<ColumnEntry>::iterator it=_columns.begin(); it != _columns.end(); it++)
818 {
819 if (it->name() == column.name())
820 {
821 cout << "Warning: column already exist (" << column.name() << "). Ignoring" << endl;
822 return false;
823 }
824 }
825 _columns.push_back(column);
826 _columns.back().setOffset(_rowWidth);
827 _rowWidth += column.width();
828 reallocateBuffers();
829
830 ostringstream str, str2, str3;
831 str << "TTYPE" << _columns.size();
832 str2 << column.name();
833 str3 << "label for field ";
834 if (_columns.size() < 10) str3 << " ";
835 if (_columns.size() < 100) str3 << " ";
836 str3 << _columns.size();
837 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
838 str.str("");
839 str2.str("");
840 str3.str("");
841 str << "TFORM" << _columns.size();
842 str2 << "1QB";
843 str3 << "data format of field " << _columns.size();
844 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
845 str.str("");
846 str2.str("");
847 str3.str("");
848 str << "ZFORM" << _columns.size();
849 str2 << column.numElems() << column.type();
850 str3 << "Original format of field " << _columns.size();
851 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
852 str.str("");
853 str2.str("");
854 str3.str("");
855 str << "ZCTYP" << _columns.size();
856 str2 << column.getCompressionString();
857 str3 << "Comp. Scheme of field " << _columns.size();
858 setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
859 //resize the catalog vector accordingly
860 for (uint32_t i=0;i<_numTiles;i++)
861 {
862 _catalog[i].resize(_columns.size());
863 for (uint32_t j=0;j<_catalog[i].size();j++)
864 _catalog[i][j] = make_pair(0,0);
865 }
866 return true;
867}
868
869/****************************************************************
870 * SET HEADER KEY
871 ****************************************************************/
872bool CompressedFitsWriter::setHeaderKey(const HeaderEntry& entry)
873{
874 HeaderEntry ent = entry;
875 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
876 {
877 if (it->key() == entry.key())
878 {
879 if (entry.comment() == "")
880 ent.setComment(it->comment());
881 (*it) = ent;
882 _headerFlushed = false;
883 return true;
884 }
885 }
886 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
887 {
888 if (it->key() == entry.key())
889 {
890 if (entry.comment() == "")
891 ent.setComment(it->comment());
892 (*it) = ent;
893 _headerFlushed = false;
894 return true;
895 }
896 }
897 if (_totalNumRows != 0)
898 {
899 cout << "Error: new header keys (" << entry.key() << ") must be set before the first row is written. Ignoring." << endl;
900 return false;
901 }
902 _header.push_back(entry);
903 _headerFlushed = false;
904 return true;
905}
906
907/****************************************************************
908 * OPEN
909 ****************************************************************/
910bool CompressedFitsWriter::open(const string& fileName, const string& tableName)
911{
912 _file.open(fileName.c_str(), ios_base::out);
913 if (!_file.is_open())
914 {
915 cout << "Error: Could not open the file (" << fileName << ")." << endl;
916 return false;
917 }
918 _defaultHeader.push_back(HeaderEntry("EXTNAME", tableName, "name of this binary table extension"));
919 _headerFlushed = false;
920 _threadIndex = 0;
921 //create requested number of threads
922 for (uint32_t i=0;i<_numThreads;i++)
923 pthread_create(&(_thread[i]), NULL, threadFunction, this);
924 //wait for the threads to start
925 while (_numThreads != _threadIndex)
926 usleep(1000);
927 //set the writing fence to the last thread
928 _threadIndex = _numThreads-1;
929 return (_file.good());
930}
931
932/****************************************************************
933 * WRITE HEADER
934 ****************************************************************/
935void CompressedFitsWriter::writeHeader(bool closingFile)
936{
937 if (_headerFlushed)
938 return;
939 if (!_file.is_open())
940 return;
941
942 long cPos = _file.tellp();
943
944 _file.seekp(0);
945
946 _file.write(_fitsHeader.c_str(), 2880);
947
948 //Write the DRS calib table here !
949 writeDrsCalib();
950
951 //we are now at the beginning of the main table. Write its header
952 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
953 _file.write(it->fitsString().c_str(), 80);
954
955 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
956 _file.write(it->fitsString().c_str(), 80);
957
958 _file.write("END ", 80);
959 long here = _file.tellp();
960 if (here%2880)
961 _file.write(_emptyBlock.c_str(), 2880 - here%2880);
962
963 _headerFlushed = true;
964
965 here = _file.tellp();
966
967 if (here%2880)
968 cout << "Error: seems that header did not finish at the end of a block." << endl;
969
970 if (here > cPos && cPos != 0)
971 {
972 cout << "Error, entries were added after the first row was written. This is not supposed to happen." << endl;
973 return;
974 }
975
976 here = _file.tellp();
977 writeCatalog(closingFile);
978
979 here = _file.tellp() - here;
980 _heapPtr = here;
981
982 if (cPos != 0)
983 _file.seekp(cPos);
984}
985
986/****************************************************************
987 * WRITE CATALOG
988 * WARNING: writeCatalog is only meant to be used by writeHeader.
989 * external usage will most likely corrupt the file
990 ****************************************************************/
991void CompressedFitsWriter::writeCatalog(bool closingFile)
992{
993 uint32_t sizeWritten = 0;
994 for (uint32_t i=0;i<_catalog.size();i++)
995 {
996 for (uint32_t j=0;j<_catalog[i].size();j++)
997 {
998 //swap the bytes
999 int8_t swappedEntry[16];
1000 swappedEntry[0] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[7];
1001 swappedEntry[1] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[6];
1002 swappedEntry[2] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[5];
1003 swappedEntry[3] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[4];
1004 swappedEntry[4] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[3];
1005 swappedEntry[5] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[2];
1006 swappedEntry[6] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[1];
1007 swappedEntry[7] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[0];
1008
1009 swappedEntry[8] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[7];
1010 swappedEntry[9] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[6];
1011 swappedEntry[10] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[5];
1012 swappedEntry[11] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[4];
1013 swappedEntry[12] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[3];
1014 swappedEntry[13] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[2];
1015 swappedEntry[14] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[1];
1016 swappedEntry[15] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[0];
1017 if (closingFile)
1018 {
1019 _checksum.add(reinterpret_cast<char*>(swappedEntry), 16);
1020 }
1021 _file.write(reinterpret_cast<char*>(&swappedEntry[0]), 2*sizeof(int64_t));
1022 sizeWritten += 2*sizeof(int64_t);
1023 }
1024 }
1025
1026 //we do not reserve space for now because fverify does not like that.
1027 //TODO bug should be fixed in the new version. Install it on the cluster and restor space reservation
1028 return ;
1029
1030 //write the padding so that the HEAP section starts at a 2880 bytes boundary
1031 if (sizeWritten % 2880 != 0)
1032 {
1033 vector<char> nullVec(2880 - sizeWritten%2880, 0);
1034 _file.write(nullVec.data(), 2880 - sizeWritten%2880);
1035 }
1036}
1037
1038/****************************************************************
1039 * ADD HEADER CHECKSUM
1040 ****************************************************************/
1041void CompressedFitsWriter::addHeaderChecksum(Checksum& checksum)
1042{
1043 for (vector<HeaderEntry>::iterator it=_defaultHeader.begin();it!=_defaultHeader.end(); it++)
1044 _checksum.add(it->fitsString().c_str(), 80);
1045 for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
1046 _checksum.add(it->fitsString().c_str(), 80);
1047 string end("END ");
1048 string space(" ");
1049 checksum.add(end.c_str(), 80);
1050 int headerRowsLeft = 36 - (_defaultHeader.size() + _header.size() + 1)%36;
1051 for (int i=0;i<headerRowsLeft;i++)
1052 checksum.add(space.c_str(), 80);
1053}
1054
1055/****************************************************************
1056 * CLOSE
1057 ****************************************************************/
1058bool CompressedFitsWriter::close()
1059{
1060 for (uint32_t i=0;i<_numThreads;i++)
1061 while (_threadStatus[i] != _THREAD_WAIT_)
1062 usleep(100000);
1063 for (uint32_t i=0;i<_numThreads;i++)
1064 _threadStatus[i] = _THREAD_EXIT_;
1065 for (uint32_t i=0;i<_numThreads;i++)
1066 pthread_join(_thread[i], NULL);
1067 //flush the rows that were not written yet
1068 if (_totalNumRows%_numRowsPerTile != 0)
1069 {
1070 copyTransposeTile(0);
1071
1072 _threadNumRows[0] = _totalNumRows;
1073 uint32_t numBytes = compressBuffer(0);
1074 writeCompressedDataToDisk(0, numBytes);
1075 }
1076 //compression stuff
1077 setHeaderKey(HeaderEntry("ZNAXIS1", _rowWidth, "Width of uncompressed rows"));
1078 setHeaderKey(HeaderEntry("ZNAXIS2", _totalNumRows, "Number of uncompressed rows"));
1079 //TODO calculate the real offset from the main table to the start of the HEAP data area
1080 setHeaderKey(HeaderEntry("ZHEAPPTR", _heapPtr, ""));
1081 setHeaderKey(HeaderEntry("THEAP", _heapPtr, ""));
1082
1083 //regular stuff
1084 if (_catalog.size() > 0)
1085 {
1086 setHeaderKey(HeaderEntry("NAXIS1", 2*sizeof(int64_t)*_catalog[0].size(), "width of table in bytes"));
1087 setHeaderKey(HeaderEntry("NAXIS2", _numTiles, ""));
1088 setHeaderKey(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
1089 int64_t heapSize = 0;
1090 int64_t compressedOffset = 0;
1091 for (uint32_t i=0;i<_catalog.size();i++)
1092 {
1093 compressedOffset += sizeof(TileHeader);
1094 heapSize += compressedOffset;
1095 for (uint32_t j=0;j<_catalog[i].size();j++)
1096 {
1097 heapSize += _catalog[i][j].first;
1098 //set the catalog offsets to their actual values
1099 _catalog[i][j].second = compressedOffset;
1100 compressedOffset += _catalog[i][j].first;
1101 //special case if entry has zero length
1102 if (_catalog[i][j].first == 0) _catalog[i][j].second = 0;
1103 }
1104 }
1105 setHeaderKey(HeaderEntry("PCOUNT", heapSize, "size of special data area"));
1106 }
1107 writeHeader(true);
1108 ostringstream str;
1109 str << _checksum.val();
1110 setHeaderKey(HeaderEntry("DATASUM", str.str(), ""));
1111 addHeaderChecksum(_checksum);
1112 setHeaderKey(HeaderEntry("CHECKSUM", _checksum.str(), ""));
1113 //update header value
1114 writeHeader();
1115 //update file length
1116 long here = _file.tellp();
1117 if (here%2880)
1118 {
1119 vector<char> nullVec(2880 - here%2880, 0);
1120 _file.write(nullVec.data(), 2880 - here%2880);
1121 }
1122 _file.close();
1123 return true;
1124}
1125
1126/****************************************************************
1127 * COPY TRANSPOSE TILE
1128 ****************************************************************/
1129void CompressedFitsWriter::copyTransposeTile(uint32_t index)
1130{
1131 uint32_t thisRoundNumRows = (_totalNumRows%_numRowsPerTile) ? _totalNumRows%_numRowsPerTile : _numRowsPerTile;
1132 //copy the tile and transpose it
1133 uint32_t offset = 0;
1134 for (uint32_t i=0;i<_columns.size();i++)
1135 {
1136 switch (_columns[i].getCompression())
1137 {
1138 case UNCOMPRESSED:
1139 case SMOOTHMAN:
1140 for (uint32_t k=0;k<thisRoundNumRows;k++)
1141 {//regular, "semi-transposed" copy
1142 memcpy(&(_transposedBuffer[index][offset]), &_buffer[k*_rowWidth + _columns[i].offset()], _columns[i].sizeOfElems()*_columns[i].numElems());
1143 offset += _columns[i].sizeOfElems()*_columns[i].numElems();
1144 }
1145 break;
1146 default :
1147 for (int j=0;j<_columns[i].numElems();j++)
1148 for (uint32_t k=0;k<thisRoundNumRows;k++)
1149 {//transposed copy
1150 memcpy(&(_transposedBuffer[index][offset]), &_buffer[k*_rowWidth + _columns[i].offset() + _columns[i].sizeOfElems()*j], _columns[i].sizeOfElems());
1151 offset += _columns[i].sizeOfElems();
1152 }
1153 break;
1154
1155 };
1156 }
1157}
1158
1159/****************************************************************
1160 * WRITE BINARY ROW
1161 ****************************************************************/
1162bool CompressedFitsWriter::writeBinaryRow(const char* bufferToWrite)
1163{
1164 if (_totalNumRows == 0)
1165 writeHeader();
1166
1167 memcpy(&_buffer[_rowWidth*(_totalNumRows%_numRowsPerTile)], bufferToWrite, _rowWidth);
1168 _totalNumRows++;
1169 if (_totalNumRows%_numRowsPerTile == 0)
1170 {
1171 //which is the next thread that we should use ?
1172 while (_threadStatus[_threadLooper] == _THREAD_COMPRESS_)
1173 usleep(100000);
1174
1175 copyTransposeTile(_threadLooper);
1176
1177 while (_threadStatus[_threadLooper] != _THREAD_WAIT_)
1178 usleep(100000);
1179
1180 _threadNumRows[_threadLooper] = _totalNumRows;
1181 _threadStatus[_threadLooper] = _THREAD_COMPRESS_;
1182 _threadLooper = (_threadLooper+1)%_numThreads;
1183 }
1184 return _file.good();
1185}
1186
1187/****************************************************************
1188 * COMPRESS BUFFER
1189 ****************************************************************/
1190uint32_t CompressedFitsWriter::compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1191{
1192 memcpy(dest, src, numRows*sizeOfElems*numRowElems);
1193 return numRows*sizeOfElems*numRowElems;
1194}
1195
1196/****************************************************************
1197 * COMPRESS BUFFER
1198 ****************************************************************/
1199uint32_t CompressedFitsWriter::compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1200{
1201 string huffmanOutput;
1202 uint32_t previousHuffmanSize = 0;
1203 if (numRows < 2)
1204 {//if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
1205 return numRows*sizeOfElems*numRowElems + 1000;
1206 }
1207 if (sizeOfElems < 2 )
1208 {
1209 cout << "Fatal ERROR: HUFMANN can only encode short or longer types" << endl;
1210 return 0;
1211 }
1212 uint32_t huffmanOffset = 0;
1213 for (uint32_t j=0;j<numRowElems;j++)
1214 {
1215 Huffman::Encode(huffmanOutput,
1216 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
1217 numRows*(sizeOfElems/2));
1218 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
1219 huffmanOffset += sizeof(uint32_t);
1220 previousHuffmanSize = huffmanOutput.size();
1221 }
1222 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
1223
1224 //only copy if not larger than not-compressed size
1225 if (totalSize < numRows*sizeOfElems*numRowElems)
1226 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
1227
1228 return totalSize;
1229}
1230
1231/****************************************************************
1232 * COMPRESS BUFFER
1233 ****************************************************************/
1234uint32_t CompressedFitsWriter::compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1235{
1236 uint32_t colWidth = numRowElems;
1237 for (int j=colWidth*numRows-1;j>1;j--)
1238 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
1239 //call the huffman transposed
1240 return compressHUFFMAN(dest, src, numRowElems, sizeOfElems, numRows);
1241}
1242
1243/****************************************************************
1244 * COMPRESS BUFFER
1245 ****************************************************************/
1246uint64_t CompressedFitsWriter::compressBuffer(uint32_t threadIndex)
1247{
1248 uint32_t thisRoundNumRows = (_threadNumRows[threadIndex]%_numRowsPerTile) ? _threadNumRows[threadIndex]%_numRowsPerTile : _numRowsPerTile;
1249 uint32_t offset=0;
1250 uint32_t currentCatalogRow = (_threadNumRows[threadIndex]-1)/_numRowsPerTile;
1251 uint64_t compressedOffset = sizeof(TileHeader); //skip the 'TILE' marker and tile size entry
1252
1253 //now compress each column one by one by calling compression on arrays
1254 for (uint32_t i=0;i<_columns.size();i++)
1255 {
1256 _catalog[currentCatalogRow][i].second = compressedOffset;
1257
1258 uint32_t compression = _columns[i].getCompression();
1259
1260 if (_columns[i].numElems() == 0) continue;
1261 //set the default byte telling if uncompressed the compressed Flag
1262 uint64_t previousOffset = compressedOffset;
1263 _compressedBuffer[threadIndex][compressedOffset++] = COMPRESSED_FLAG;
1264 switch (compression)
1265 {
1266 case UNCOMPRESSED:
1267 compressedOffset += compressUNCOMPRESSED(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1268 break;
1269 case SMOOTHMAN:
1270 compressedOffset += compressSMOOTHMAN(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1271 break;
1272
1273 default:
1274 ;
1275 };
1276 //check if compressed size is larger than uncompressed
1277 if (compression != UNCOMPRESSED &&
1278 compressedOffset - previousOffset > _columns[i].sizeOfElems()*_columns[i].numElems()*thisRoundNumRows+1)
1279 {//if so set flag and redo it uncompressed
1280 compressedOffset = previousOffset;
1281 _compressedBuffer[threadIndex][compressedOffset++] = UNCOMPRESSED_FLAG;
1282 compressedOffset += compressUNCOMPRESSED(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1283 }
1284 offset += thisRoundNumRows*_columns[i].sizeOfElems()*_columns[i].numElems();
1285 _catalog[currentCatalogRow][i].first = compressedOffset - _catalog[currentCatalogRow][i].second;
1286 }
1287 memcpy(&(_compressedBuffer[threadIndex][4]), &compressedOffset, sizeof(uint64_t));
1288 return compressedOffset;
1289}
1290
1291/****************************************************************
1292 * WRITE COMPRESS DATA TO DISK
1293 ****************************************************************/
1294bool CompressedFitsWriter::writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite)
1295{
1296 char* checkSumPointer = _compressedBuffer[threadID];
1297 int32_t extraBytes = 0;
1298 uint32_t sizeToChecksum = sizeToWrite;
1299 if (_checkOffset != 0)
1300 {//should we extend the array to the left ?
1301 sizeToChecksum += _checkOffset;
1302 checkSumPointer -= _checkOffset;
1303 memset(checkSumPointer, 0, _checkOffset);
1304 }
1305 if (sizeToChecksum%4 != 0)
1306 {//should we extend the array to the right ?
1307 extraBytes = 4 - (sizeToChecksum%4);
1308 memset(checkSumPointer+sizeToChecksum, 0,extraBytes);
1309 sizeToChecksum += extraBytes;
1310 }
1311 //do the checksum
1312 _checksum.add(checkSumPointer, sizeToChecksum);
1313 _checkOffset = (4 - extraBytes)%4;
1314 //write data to disk
1315 _file.write(_compressedBuffer[threadID], sizeToWrite);
1316 return _file.good();
1317}
1318
1319/****************************************************************
1320 * WRITER THREAD LOOP
1321 ****************************************************************/
1322void* CompressedFitsWriter::threadFunction(void* context)
1323{
1324 CompressedFitsWriter* myself =static_cast<CompressedFitsWriter*>(context);
1325
1326 uint32_t myID = 0;
1327 pthread_mutex_lock(&(myself->_mutex));
1328 myID = myself->_threadIndex++;
1329 pthread_mutex_unlock(&(myself->_mutex));
1330 uint32_t threadToWaitForBeforeWriting = (myID == 0) ? myself->_numThreads-1 : myID-1;
1331 while (myself->_threadStatus[myID] != _THREAD_EXIT_)
1332 {
1333 while (myself->_threadStatus[myID] == _THREAD_WAIT_)
1334 usleep(100000);
1335 if (myself->_threadStatus[myID] != _THREAD_COMPRESS_)
1336 continue;
1337 uint32_t numBytes = myself->compressBuffer(myID);
1338 myself->_threadStatus[myID] = _THREAD_WRITE_;
1339
1340 //wait for the previous data to be written
1341 while (myself->_threadIndex != threadToWaitForBeforeWriting)
1342 usleep(1000);
1343 //do the actual writing to disk
1344 pthread_mutex_lock(&(myself->_mutex));
1345 myself->writeCompressedDataToDisk(myID, numBytes);
1346 myself->_threadIndex = myID;
1347 pthread_mutex_unlock(&(myself->_mutex));
1348 myself->_threadStatus[myID] = _THREAD_WAIT_;
1349 }
1350 return NULL;
1351}
1352
1353/****************************************************************
1354 * PRINT USAGE
1355 ****************************************************************/
1356void printUsage()
1357{
1358 cout << endl;
1359 cout << "The FACT-Fits compressor reads an input Fits file from FACT"
1360 " and compresses it.\n It can use a drs calibration in order to"
1361 " improve the compression ratio. If so, the input calibration"
1362 " is embedded into the compressed file.\n"
1363 " By default, the Data column will be compressed using SMOOTHMAN (Thomas' algorithm)"
1364 " while other columns will be compressed with the AMPLITUDE coding (Veritas)"
1365 "Usage: Compressed_Fits_Test <inputFile>";
1366 cout << endl;
1367}
1368
1369/****************************************************************
1370 * PRINT HELP
1371 ****************************************************************/
1372void printHelp()
1373{
1374 cout << endl;
1375 cout << "The inputFile is required. It must have fits in its filename and the compressed file will be written in the same folder. "
1376 "The fz extension will be added, replacing the .gz one if required \n"
1377 "If output is specified, then it will replace the automatically generated output filename\n"
1378 "If --drs, followed by a drs calib then it will be applied to the data before compressing\n"
1379 "rowPerTile can be used to tune how many rows are in each tile. Default is 100\n"
1380 "threads gives the number of threads to use. Cannot be less than the default (1)\n"
1381 "compression explicitely gives the compression scheme to use for a given column. The syntax is:\n"
1382 "<ColumnName>=<CompressionScheme> with <CompressionScheme> one of the following:\n"
1383 "UNCOMPRESSED\n"
1384 "AMPLITUDE\n"
1385 "HUFFMAN\n"
1386 "SMOOTHMAN\n"
1387 "INT_WAVELET\n"
1388 "\n"
1389 "--quiet removes any textual output, except error messages\n"
1390 "--verify makes the compressor check the compressed data. It will read it back, and compare the reconstructed CHECKSUM and DATASUM with the original file values."
1391 ;
1392 cout << endl << endl;
1393}
1394
1395/****************************************************************
1396 * SETUP CONFIGURATION
1397 ****************************************************************/
1398void setupConfiguration(Configuration& conf)
1399{
1400 po::options_description configs("FitsCompressor options");
1401 configs.add_options()
1402 ("inputFile,i", vars<string>(), "Input file")
1403 ("drs,d", var<string>(), "Input drs calibration file")
1404 ("rowPerTile,r", var<unsigned int>(), "Number of rows per tile. Default is 100")
1405 ("output,o", var<string>(), "Output file. If empty, .fz is appened to the original name")
1406 ("threads,t", var<unsigned int>(), "Number of threads to use for compression")
1407 ("compression,c", vars<string>(), "which compression to use for which column. Syntax <colName>=<compressionScheme>")
1408 ("quiet,q", po_switch(), "Should the program display any text at all ?")
1409 ("verify,v", po_switch(), "Should we verify the data that has been compressed ?")
1410 ;
1411 po::positional_options_description positional;
1412 positional.add("inputFile", -1);
1413 conf.AddOptions(configs);
1414 conf.SetArgumentPositions(positional);
1415}
1416
1417/****************************************************************
1418 * MAIN
1419 ****************************************************************/
1420int main(int argc, const char** argv)
1421{
1422 Configuration conf(argv[0]);
1423 conf.SetPrintUsage(printUsage);
1424 setupConfiguration(conf);
1425
1426 if (!conf.DoParse(argc, argv, printHelp))
1427 return -1;
1428
1429 //initialize the file names to nothing.
1430 string fileNameIn = "";
1431 string fileNameOut = "";
1432 string drsFileName = "";
1433 uint32_t numRowsPerTile = 100;
1434 bool displayText=true;
1435
1436 //parse configuration
1437 if (conf.Get<bool>("quiet")) displayText = false;
1438 const vector<string> inputFileNameVec = conf.Vec<string>("inputFile");
1439 if (inputFileNameVec.size() != 1)
1440 {
1441 cout << "Error: ";
1442 if (inputFileNameVec.size() == 0) cout << "no";
1443 else cout << inputFileNameVec.size();
1444 cout << " input file(s) given. Expected one. Aborting. Input:" << endl;;
1445 for (unsigned int i=0;i<inputFileNameVec.size(); i++)
1446 cout << inputFileNameVec[i] << endl;
1447 return -1;
1448 }
1449
1450 //Assign the input filename
1451 fileNameIn = inputFileNameVec[0];
1452
1453 //Check if we have a drs calib too
1454 if (conf.Has("drs")) drsFileName = conf.Get<string>("drs");
1455
1456 //Should we verify the data ?
1457 bool verifyDataPlease = false;
1458 if (conf.Has("verify")) verifyDataPlease = conf.Get<bool>("verify");
1459
1460
1461 //should we use a specific output filename ?
1462 if (conf.Has("output"))
1463 fileNameOut = conf.Get<string>("output");
1464 else
1465 {
1466 size_t pos = fileNameIn.find(".fits");
1467 if (pos == string::npos)
1468 {
1469 cout << "ERROR: input file does not seems ot be fits. Aborting." << endl;
1470 return -1;
1471 }
1472 fileNameOut = fileNameIn.substr(0, pos) + ".fz";
1473 }
1474
1475 //should we use specific compression on some columns ?
1476 const vector<string> columnsCompression = conf.Vec<string>("compression");
1477
1478 //split up values between column names and compression scheme
1479 vector<std::pair<string, string>> compressions;
1480 for (unsigned int i=0;i<columnsCompression.size();i++)
1481 {
1482 size_t pos = columnsCompression[i].find_first_of("=");
1483 if (pos == string::npos)
1484 {
1485 cout << "ERROR: Something wrong occured while parsing " << columnsCompression[i] << ". Aborting." << endl;
1486 return -1;
1487 }
1488 string comp = columnsCompression[i].substr(pos+1);
1489 if (comp != "UNCOMPRESSED" && comp != "AMPLITUDE" && comp != "HUFFMAN" &&
1490 comp != "SMOOTHMAN" && comp != "INT_WAVELET")
1491 {
1492 cout << "Unkown compression scheme requested (" << comp << "). Aborting." << endl;
1493 return -1;
1494 }
1495 compressions.push_back(make_pair(columnsCompression[i].substr(0, pos), comp));
1496 }
1497
1498 //How many rows per tile should we use ?
1499 if (conf.Has("rowPerTile")) numRowsPerTile = conf.Get<unsigned int>("rowPerTile");
1500
1501 /************************************************************************************
1502 * Done reading configuration. Open relevant files
1503 ************************************************************************************/
1504
1505 //Open input's fits file
1506 factfits inFile(fileNameIn);
1507
1508 if (inFile.IsCompressedFITS())
1509 {
1510 cout << "ERROR: input file is already a compressed fits. Cannot be compressed again: Aborting." << endl;
1511 return -1;
1512 }
1513
1514 //decide how many tiles should be put in the compressed file
1515 uint32_t originalNumRows = inFile.GetNumRows();
1516 uint32_t numTiles = (originalNumRows%numRowsPerTile) ? (originalNumRows/numRowsPerTile)+1 : originalNumRows/numRowsPerTile;
1517 CompressedFitsWriter outFile(numTiles, numRowsPerTile);
1518
1519 //should we use a specific number of threads for compressing ?
1520 unsigned int numThreads = 1;
1521 if (conf.Has("threads"))
1522 {
1523 numThreads = conf.Get<unsigned int>("threads");
1524 outFile.setNumWorkingThreads(numThreads);
1525 }
1526
1527 if (!outFile.open(fileNameOut))
1528 {
1529 cout << "Error: could not open " << fileNameOut << " for writing" << endl;
1530 return -1;
1531 }
1532
1533 //Because the file to open MUST be given by the constructor, I must use a pointer instead
1534 factfits* drsFile = NULL;
1535 //try to open the Drs file. If any.
1536 if (drsFileName != "")
1537 {
1538 try
1539 {
1540 drsFile = new factfits(drsFileName);
1541 }
1542 catch (...)
1543 {
1544 cout << "Error: could not open " << drsFileName << " for calibration" << endl;
1545 return -1;
1546 }
1547 }
1548
1549 if (displayText)
1550 {
1551 cout << endl;
1552 cout << "**********************" << endl;
1553 cout << "Will compress from : " << fileNameIn << endl;
1554 cout << "to : " << fileNameOut << endl;
1555 if (drsFileName != "")
1556 cout << "while calibrating with: " << drsFileName << endl;
1557 cout << "Compression will use : " << numThreads << " worker threads" << endl;
1558 cout << "Data will be verified : ";
1559 if (verifyDataPlease)
1560 cout << "yes" << endl;
1561 else
1562 cout << "no (WARNING !)" << endl;
1563 cout << "**********************" << endl;
1564 cout << endl;
1565 }
1566
1567 /************************************************************************************
1568 * Done opening input files. Allocate memory and configure output file
1569 ************************************************************************************/
1570
1571 //allocate the buffer for temporary storage of each read/written row
1572 uint32_t rowWidth = inFile.GetUInt("NAXIS1");
1573 char* buffer = new char[rowWidth];
1574
1575 //get the source columns
1576 const fits::Table::Columns& columns = inFile.GetColumns();
1577 const fits::Table::SortedColumns& sortedColumns = inFile.GetSortedColumns();
1578 if (displayText)
1579 cout << "Input file has " << columns.size() << " columns and " << inFile.GetNumRows() << " rows" << endl;
1580
1581 //Add columns.
1582 for (uint32_t i=0;i<sortedColumns.size(); i++)
1583 {
1584 //get column name
1585 ostringstream str;
1586 str << "TTYPE" << i+1;
1587 string colName = inFile.GetStr(str.str());
1588 if (displayText)
1589 {
1590 cout << "Column " << colName;
1591 for (uint32_t j=colName.size();j<21;j++)
1592 cout << " ";
1593 cout << " -> ";
1594 }
1595
1596 //first lets see if we do have an explicit request
1597 bool explicitRequest = false;
1598 for (unsigned int j=0;j<compressions.size();j++)
1599 {
1600 if (compressions[j].first == colName)
1601 {
1602 explicitRequest = true;
1603 if (displayText) cout << compressions[j].second << endl;
1604 if (compressions[j].second == "UNCOMPRESSED")
1605 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, CompressedFitsFile::UNCOMPRESSED));
1606 if (compressions[j].second == "SMOOTHMAN")
1607 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, CompressedFitsFile::SMOOTHMAN));
1608 break;
1609 }
1610 }
1611
1612 if (explicitRequest) continue;
1613
1614 if (colName != "Data")
1615 {
1616 if (displayText) cout << "UNCOMPRESSED" << endl;
1617 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, CompressedFitsFile::UNCOMPRESSED));
1618 }
1619 else
1620 {
1621 if (displayText) cout << "SMOOTHMAN" << endl;
1622 outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, CompressedFitsFile::SMOOTHMAN));
1623 }
1624 }
1625
1626 //translate original header entries to their Z-version
1627 const fits::Table::Keys& header = inFile.GetKeys();
1628 for (fits::Table::Keys::const_iterator i=header.begin(); i!= header.end(); i++)
1629 {
1630 string k = i->first;//header[i].key();
1631 if (k == "XTENSION" || k == "BITPIX" || k == "PCOUNT" || k == "GCOUNT" || k == "TFIELDS")
1632 continue;
1633 if (k == "CHECKSUM")
1634 {
1635 outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("ZCHKSUM", i->second.value, i->second.comment));
1636 continue;
1637 }
1638 if (k == "DATASUM")
1639 {
1640 outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("ZDTASUM", i->second.value, i->second.comment));
1641 continue;
1642 }
1643 k = k.substr(0,5);
1644 if (k == "TTYPE" || k == "TFORM")
1645 {
1646 string tmpKey = i->second.fitsString;
1647 tmpKey[0] = 'Z';
1648 outFile.setHeaderKey(tmpKey);
1649 continue;
1650 }
1651 if (k == "NAXIS")
1652 continue;
1653 outFile.setHeaderKey(i->second.fitsString);
1654 }
1655
1656 //deal with the DRS calib
1657 int16_t* drsCalib16 = NULL;
1658
1659 //load the drs calib. data
1660 int32_t startCellOffset = -1;
1661 if (drsFileName != "")
1662 {
1663 drsCalib16 = new int16_t[1440*1024];
1664 float* drsCalibFloat = NULL;
1665 try
1666 {
1667 drsCalibFloat = reinterpret_cast<float*>(drsFile->SetPtrAddress("BaselineMean"));
1668 }
1669 catch (...)
1670 {
1671 cout << "Could not find column BaselineMean in drs calibration file " << drsFileName << ". Aborting" << endl;
1672 return -1;
1673 }
1674
1675 //read the calibration and calculate its integer value
1676 drsFile->GetNextRow();
1677 for (uint32_t i=0;i<1440*1024;i++)
1678 drsCalib16[i] = (int16_t)(drsCalibFloat[i]*4096.f/2000.f);
1679
1680 //assign it to the ouput file
1681 outFile.setDrsCalib(drsCalib16);
1682
1683 //get the start cells offsets
1684 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1685 if (it->first == "StartCellData")
1686 {
1687 startCellOffset = it->second.offset;
1688 if (it->second.type != 'I')
1689 {
1690 cout << "Wrong type for the StartCellData Column: " << it->second.type << " instead of I expected"<< endl;
1691 return -1;
1692 }
1693 }
1694
1695 if (startCellOffset == -1)
1696 {
1697 cout << "Could not find StartCellData in input file " << fileNameIn << ". Aborting."<< endl;
1698 return -1;
1699 }
1700 }
1701
1702 /************************************************************************************
1703 * Done configuring compression. Do the real job now !
1704 ************************************************************************************/
1705
1706 if (displayText) cout << "Converting file..." << endl;
1707
1708 int numSlices = -1;
1709 int32_t dataOffset = -1;
1710
1711 //Get the pointer to the column that must be drs-calibrated
1712 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1713 if (it->first == "Data")
1714 {
1715 numSlices = it->second.num;
1716 dataOffset = it->second.offset;
1717 }
1718 if (numSlices % 1440 != 0)
1719 {
1720 cout << "seems like the number of samples is not a multiple of 1440. Aborting." << endl;
1721 return -1;
1722 }
1723 if (dataOffset == -1)
1724 {
1725 cout << "Could not find the column Data in the input file. Aborting." << endl;
1726 return -1;
1727 }
1728
1729 numSlices /= 1440;
1730
1731 //set pointers to the readout data to later be able to gather it to "buffer".
1732 vector<void*> readPointers;
1733 vector<int32_t> readOffsets;
1734 vector<int32_t> readElemSize;
1735 vector<int32_t> readNumElems;
1736 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1737 {
1738 readPointers.push_back(inFile.SetPtrAddress(it->first));
1739 readOffsets.push_back(it->second.offset);
1740 readElemSize.push_back(it->second.size);
1741 readNumElems.push_back(it->second.num);
1742 }
1743
1744 //Convert each row one after the other
1745 for (uint32_t i=0;i<inFile.GetNumRows();i++)
1746 {
1747 if (displayText) cout << "\r Row " << i+1 << flush;
1748 inFile.GetNextRow();
1749 //copy from inFile internal structures to buffer
1750 int32_t count=0;
1751 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end();it++)
1752 {
1753 memcpy(&buffer[readOffsets[count]], readPointers[count], readElemSize[count]*readNumElems[count]);
1754 count++;
1755 }
1756
1757 if (startCellOffset != -1)
1758 {//drs calibrate this data
1759 for (int j=0;j<1440;j++)
1760 {
1761 const int thisStartCell = reinterpret_cast<int16_t*>(&buffer[startCellOffset])[j];
1762 if (thisStartCell < 0) continue;
1763 for (int k=0;k<numSlices;k++)
1764 reinterpret_cast<int16_t*>(&buffer[dataOffset])[numSlices*j + k] -= drsCalib16[1024*j + (thisStartCell+k)%1024];
1765 }
1766 }
1767 outFile.writeBinaryRow(buffer);
1768 };
1769
1770 //Get table name for later use in case the compressed file is to be verified
1771 string tableName = inFile.GetStr("EXTNAME");
1772
1773 if (displayText) cout << endl << "Done. Flushing output file..." << endl;
1774
1775 inFile.close();
1776 if (!outFile.close())
1777 {
1778 cout << "Something went wrong while writing the catalog: negative index" << endl;
1779 return false;
1780 }
1781
1782 delete[] drsCalib16;
1783
1784 if (displayText) cout << "Done." << endl;
1785
1786 /************************************************************************************
1787 * Actual job done. Should we verify what we did ?
1788 ************************************************************************************/
1789
1790 if (verifyDataPlease)
1791 {
1792 if (displayText) cout << "Now verify data..." << endl;
1793 }
1794 else
1795 return 0;
1796
1797 //get a compressed reader
1798//TEMP try to copy the file too
1799// string copyName("/scratch/copyFile.fz");
1800 string copyName("");
1801 factfits verifyFile(fileNameOut, copyName, tableName, false);
1802
1803 //and the header of the compressed file
1804 const fits::Table::Keys& header2 = verifyFile.GetKeys();
1805
1806 //get a non-compressed writer
1807 ofits reconstructedFile;
1808
1809 //figure out its name: /dev/null unless otherwise specified
1810 string reconstructedName = fileNameOut+".recons";
1811 reconstructedName = "/dev/null";
1812 reconstructedFile.open(reconstructedName.c_str(), false);
1813
1814 //reconstruct the original columns from the compressed file.
1815 string origChecksumStr;
1816 string origDatasum;
1817
1818 //reset tablename value so that it is re-read from compressed table's header
1819 tableName = "";
1820
1821 /************************************************************************************
1822 * Reconstruction setup done. Rebuild original header
1823 ************************************************************************************/
1824
1825 //re-tranlate the keys
1826 for (fits::Table::Keys::const_iterator it=header2.begin(); it!= header2.end(); it++)
1827 {
1828 string k = it->first;
1829 if (k == "XTENSION" || k == "BITPIX" || k == "PCOUNT" || k == "GCOUNT" ||
1830 k == "TFIELDS" || k == "ZTABLE" || k == "ZNAXIS1" || k == "ZNAXIS2" ||
1831 k == "ZHEAPPTR" || k == "ZPCOUNT" || k == "ZTILELEN" || k == "THEAP" ||
1832 k == "CHECKSUM" || k == "DATASUM" || k == "FCTCPVER")
1833 {
1834 continue;
1835 }
1836
1837 if (k == "ZCHKSUM")
1838 {
1839 reconstructedFile.SetKeyComment("CHECKSUM", it->second.comment);
1840 origChecksumStr = it->second.value;
1841 continue;
1842 }
1843
1844 if (k == "ZDTASUM")
1845 {
1846 reconstructedFile.SetKeyComment("DATASUM", it->second.comment);
1847 origDatasum = it->second.value;
1848 continue;
1849 }
1850
1851 if (k == "EXTNAME")
1852 {
1853 tableName = it->second.value;
1854 }
1855
1856 k = k.substr(0,5);
1857
1858 if (k == "TTYPE")
1859 {//we have an original column name here.
1860 //manually deal with these in order to preserve the ordering (easier than re-constructing yet another list on the fly)
1861 continue;
1862 }
1863
1864 if (k == "TFORM" || k == "NAXIS" || k == "ZCTYP" )
1865 {
1866 continue;
1867 }
1868
1869 if (k == "ZFORM" || k == "ZTYPE")
1870 {
1871 string tmpKey = it->second.fitsString;
1872 tmpKey[0] = 'T';
1873 reconstructedFile.SetKeyFromFitsString(tmpKey);
1874 continue;
1875 }
1876
1877 reconstructedFile.SetKeyFromFitsString(it->second.fitsString);
1878 }
1879
1880 if (tableName == "")
1881 {
1882 cout << "Error: table name from file " << fileNameOut << " could not be found. Aborting" << endl;
1883 return -1;
1884 }
1885
1886 //Restore the original columns
1887 for (uint32_t numCol=1; numCol<10000; numCol++)
1888 {
1889 ostringstream str;
1890 str << numCol;
1891 if (!verifyFile.HasKey("TTYPE"+str.str())) break;
1892
1893 string ttype = verifyFile.GetStr("TTYPE"+str.str());
1894 string tform = verifyFile.GetStr("ZFORM"+str.str());
1895 char type = tform[tform.size()-1];
1896 string number = tform.substr(0, tform.size()-1);
1897 int numElems = atoi(number.c_str());
1898
1899 if (number == "") numElems=1;
1900
1901 reconstructedFile.AddColumn(numElems, type, ttype, "", "", false);
1902 }
1903
1904 reconstructedFile.WriteTableHeader(tableName.c_str());
1905
1906 /************************************************************************************
1907 * Original header restored. Do the data
1908 ************************************************************************************/
1909
1910 //set pointers to the readout data to later be able to gather it to "buffer".
1911 readPointers.clear();
1912 readOffsets.clear();
1913 readElemSize.clear();
1914 readNumElems.clear();
1915 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1916 {
1917 readPointers.push_back(verifyFile.SetPtrAddress(it->first));
1918 readOffsets.push_back(it->second.offset);
1919 readElemSize.push_back(it->second.size);
1920 readNumElems.push_back(it->second.num);
1921 }
1922
1923 //do the actual reconstruction work
1924 uint32_t i=1;
1925 while (i<=verifyFile.GetNumRows() && verifyFile.GetNextRow())
1926 {
1927 int count=0;
1928 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end();it++)
1929 {
1930 memcpy(&buffer[readOffsets[count]], readPointers[count], readElemSize[count]*readNumElems[count]);
1931 count++;
1932 }
1933 if (displayText) cout << "\r Row " << i << flush;
1934 reconstructedFile.WriteRow(buffer, rowWidth);
1935 i++;
1936 }
1937
1938 if (displayText) cout << endl;
1939
1940 //close reconstruction input and output
1941// verifyFile.close();
1942 reconstructedFile.close();
1943
1944 //get original and reconstructed checksum and datasum
1945 std::pair<string, int> origChecksum = make_pair(origChecksumStr, atoi(origDatasum.c_str()));
1946 std::pair<string, int> newChecksum = reconstructedFile.GetChecksumData();
1947
1948 //verify that no mistake was made
1949 if (origChecksum.second != newChecksum.second)
1950 {
1951 cout << "ERROR: datasums are NOT identical: " << (uint32_t)(origChecksum.second) << " vs " << (uint32_t)(newChecksum.second) << endl;
1952 return -1;
1953 }
1954 if (origChecksum.first != newChecksum.first)
1955 {
1956 cout << "WARNING: checksums are NOT Identical: " << origChecksum.first << " vs " << newChecksum.first << endl;
1957 }
1958 else
1959 {
1960 if (displayText) cout << "Ok" << endl;
1961 }
1962
1963 delete[] buffer;
1964 return 0;
1965
1966}
1967
Note: See TracBrowser for help on using the repository browser.