source: trunk/Mars/mcore/zofits.h@ 17216

Last change on this file since 17216 was 17216, checked in by lyard, 11 years ago
Added the brand new zofits class.
File size: 38.3 KB
Line 
1/*
2 * zofits.h
3 *
4 * FACT native compressed FITS writer
5 * Author: lyard
6 */
7
8#include "ofits.h"
9
10#ifndef __MARS__
11namespace std
12{
13#else
14using namespace std;
15#endif
16
17class zofits : public ofits
18{
19 public:
20
21 //This has been duplicated from zfits. Should be be located one level up ?
22 //If so, where ?
23 enum CompressionProcess_t
24 {
25 kFactRaw = 0x0,
26 kFactSmoothing = 0x1,
27 kFactHuffman16 = 0x2
28 };
29
30 enum RowOrdering_t
31 {
32 kOrderByCol = 'C',
33 kOrderByRow = 'R'
34 };
35
36 //TileHeaders are only written, but never read-back
37 //They are here to be able to recover raw data from binary if the header is corrupted
38 //Or to cross-check the data, if desired: the zfits method CheckIfFileIsConsistent can do this
39 struct TileHeader
40 {
41 char id[4];
42 uint32_t numRows;
43 uint64_t size;
44 TileHeader(uint32_t nRows=0,
45 uint64_t s=0) : id({'T', 'I', 'L', 'E'}),
46 numRows(nRows),
47 size(s)
48 { };
49 } __attribute__((__packed__));
50
51 //BlockHeaders are written before every compressed blob of data
52 struct BlockHeader
53 {
54 uint64_t size;
55 char ordering;
56 unsigned char numProcs;
57 BlockHeader(uint64_t s=0,
58 char o=zfits::kOrderByRow,
59 unsigned char n=1) : size(s),
60 ordering(o),
61 numProcs(n)
62 {}
63 } __attribute__((__packed__)) ;
64
65
66 //constructors
67 zofits(uint32_t numTiles=1000,
68 uint32_t rowPerTile=100) : ofits()
69 {
70 InitMemberVariables(numTiles, rowPerTile);
71 SetNumWorkingThreads(1);
72 }
73
74 zofits(const char* fname,
75 uint32_t numTiles=1000,
76 uint32_t rowPerTile=100) : ofits(fname)
77 {
78 InitMemberVariables(numTiles, rowPerTile);
79 SetNumWorkingThreads(1);
80 }
81
82 ~zofits()
83 {
84 }
85
86 //initialization of member variables
87 void InitMemberVariables(uint32_t nt=0, uint32_t rpt=0)
88 {
89 fCheckOffset = 0;
90 fThreadLooper = 0;
91
92 fNumTiles = nt;
93 fNumRowsPerTile = rpt;
94
95 fNumThreads = 1;
96 fThreadIndex = 0; ///< A variable to assign threads indices
97
98 fBuffer = NULL;
99 fRealRowWidth = 0;
100
101 fCatalogOffset = 0;
102 fStartCellsOffset = -1;
103 fDataOffset = -1;
104 }
105
106 //whether or not a calibration was given to the file writer
107 bool IsOffsetCalibrated()
108 {
109 return (fOffsetCalibration.size() != 0);
110 }
111
112 //assign a given drs offset calibration
113 void SetDrsCalibration(const float* calib)
114 {
115 if (!IsOffsetCalibrated())
116 fOffsetCalibration.resize(1440*1024);
117
118 for (uint32_t i=0;i<1440*1024;i++)
119 fOffsetCalibration[i] = (int16_t)(calib[i]*4096.f/2000.f);
120 }
121
122 void SetDrsCalibration(const vector<float>& calib)
123 {
124 if (calib.size() != 1440*1024)
125#ifdef __EXCEPTIONS
126 throw runtime_error("Cannot load calibration with anything else than 1024 samples per pixel");
127#else
128 gLog << ___err___ << "ERROR - Cannot load calibration with anything else than 1024 samples per pixel");
129#endif
130 SetDrsCalibration(calib.data());
131 }
132
133 void LoadDrsCalibrationFromFile(const string& fileName)
134 {
135 factfits drsFile(fileName);
136 float* drsCalibFloat = reinterpret_cast<float*>(drsFile.SetPtrAddress("BaselineMean"));
137
138 drsFile.GetNextRow();
139
140 SetDrsCalibration(drsCalibFloat);
141 }
142
143 //write the header of the binary table
144 bool WriteTableHeader(const char* name="DATA")
145 {
146 ofits::WriteTableHeader(name);
147
148 if (IsOffsetCalibrated())
149 {//retrieve the column storing the start cell offsets, if required.
150
151 for (auto it=fRealColumns.begin(); it!=fRealColumns.end(); it++)//Table.cols.begin(); it!= fTable.cols.end(); it++)
152 {
153 if (it->col.name == "StartCellData")
154 fStartCellsOffset = it->col.offset;
155 if (it->col.name == "Data")
156 {
157 fNumSlices = it->col.num;
158 fDataOffset = it->col.offset;
159 if (fNumSlices % 1440 != 0)
160 {
161#ifdef __EXCEPTIONS
162 throw runtime_error("Number of data samples not a multiple of 1440.");
163#else
164 gLog << ___err___ << "ERROR - Number of data samples not a multiple of 1440. Doing it uncalibrated." << endl;
165#endif
166 fOffsetCalibration.resize(0);
167 }
168 fNumSlices /= 1440;
169 }
170 }
171 if (fStartCellsOffset < 0)
172 {
173#ifdef __EXCEPTIONS
174 throw runtime_error("FACT Calibration requested, but \"StartCellData\" column not found.");
175#else
176 gLog << ___err___ << "ERROR - FACT Calibration requested, but \"StartCellData\" column not found. Doing it uncalibrated." << endl;
177#endif
178 //throw away the calibration data
179 fOffsetCalibration.resize(0);
180 }
181 if (fDataOffset < 0)
182 {
183#ifdef __EXCEPTIONS
184 throw runtime_error("FACT Calibration requested, but \"Data\" column not found.");
185#else
186 gLog << ___err___ << "ERROR - FACT Calibration requested, but \"Data\" column not found. Doing it uncalibrated." << endl;
187#endif
188 //throw away the calibration data
189 fOffsetCalibration.resize(0);
190 }
191 }
192 }
193
194 void open(const char* filename, bool addEXTNAMEKey=true)
195 {
196 ofits::open(filename, addEXTNAMEKey);
197
198 //add compression-related header entries
199 SetBool("ZTABLE", true, "Table is compressed");
200 SetInt("ZNAXIS1", 0, "Width of uncompressed rows");
201 SetInt("ZNAXIS2", 0, "Number of uncompressed rows");
202 SetInt("ZPCOUNT", 0, "");
203 SetInt("ZHEAPPTR", 0, "");
204 SetInt("ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
205 SetInt("THEAP", 0, "");
206 SetStr("RAWSUM", " 0", "Checksum of raw littlen endian data");
207
208
209 fRawSum.reset();
210
211 //start the compression threads
212 pthread_mutex_init(&fMutex, NULL);
213
214 fThreadIndex = 0;
215 for (uint32_t i=0;i<fNumThreads;i++)
216 pthread_create(&(fThread[i]), NULL, threadFunction, this);
217
218 //wait for all threads to have started
219 while (fNumThreads != fThreadIndex)
220 usleep(1000);
221
222 //set the writing fence to the last thread (so that the first one can start writing right away)
223 fThreadIndex = fNumThreads-1;
224 }
225
226 bool WriteDrsOffsetsTable()
227 {
228 if (!IsOffsetCalibrated())
229 return false;
230
231 ofits c;
232 c.SetStr("XTENSION", "BINTABLE" , "binary table extension");
233 c.SetInt("BITPIX" , 8 , "8-bit bytes");
234 c.SetInt("NAXIS" , 2 , "2-dimensional binary table");
235 c.SetInt("NAXIS1" , 1024*1440*2 , "width of table in bytes");
236 c.SetInt("NAXIS2" , 1 , "number of rows in table");
237 c.SetInt("PCOUNT" , 0 , "size of special data area");
238 c.SetInt("GCOUNT" , 1 , "one data group (required keyword)");
239 c.SetInt("TFIELDS" , 1 , "number of fields in each row");
240 c.SetStr("CHECKSUM", "0000000000000000" , "Checksum for the whole HDU");
241 c.SetStr("DATASUM" , " 0" , "Checksum for the data block");
242 c.SetStr("EXTNAME" , "ZDrsCellOffsets" , "name of this binary table extension");
243 c.SetStr("TTYPE1" , "OffsetCalibration" , "label for field 1");
244 c.SetStr("TFORM1" , "1474560I" , "data format of field: 2-byte INTEGER");
245 c.End();
246
247 vector<char> swappedOffsets;
248 swappedOffsets.resize(1024*1440*sizeof(int16_t));
249 revcpy<sizeof(int16_t)>(swappedOffsets.data(), (char*)(fOffsetCalibration.data()), 1024*1440);
250
251 Checksum datasum;
252 datasum.add(swappedOffsets.data(), sizeof(int16_t)*1024*1440);
253
254 ostringstream dataSumStr;
255 dataSumStr << datasum.val();
256 c.SetStr("DATASUM", dataSumStr.str());
257
258 datasum += c.WriteHeader(*this);
259
260 const off_t here_I_am = tellp();
261
262 c.SetStr("CHECKSUM", datasum.str());
263 c.WriteHeader(*this);
264
265 seekp(here_I_am);
266
267 write(swappedOffsets.data(), swappedOffsets.size());
268
269 AlignTo2880Bytes();
270
271 return good();
272 }
273
274 uint32_t GetBytesPerRow() const
275 {
276 return fRealRowWidth;
277 }
278
279 bool WriteCatalog()
280 {
281 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
282 const uint32_t total_catalog_size = fCatalog.size()*one_catalog_row_size;
283
284 vector<char> swapped_catalog(total_catalog_size);
285 uint32_t shift = 0;
286 for (auto it=fCatalog.begin(); it!=fCatalog.end(); it++)
287 {
288 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
289 shift += one_catalog_row_size;
290 }
291
292 if (fCatalogOffset == 0)
293 {
294 fCatalogOffset = tellp();
295 }
296
297 const off_t where_are_we = tellp();
298
299 seekp(fCatalogOffset);
300 write(swapped_catalog.data(), total_catalog_size);
301 if (where_are_we != fCatalogOffset)
302 seekp(where_are_we);
303
304 fCatalogSum.reset();
305 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
306
307 return good();
308 }
309
310 bool WriteRow(const void* ptr, size_t cnt, bool byte_swap=true)
311 {
312 if (cnt != fRealRowWidth)
313 {
314#ifdef __EXCEPTIONS
315 throw runtime_error("Wrong size of row given to WriteRow");
316#else
317 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << endl;
318 return false;
319#endif
320 }
321
322 if (fTable.num_rows >= fNumRowsPerTile*fNumTiles)
323 {
324#ifdef __EXCEPTIONS
325 throw runtime_error("Maximum number of rows exceeded for this file");
326#else
327 gLog << ___err___ << "ERROR - Maximum number of rows exceeded for this file" << endl;
328 return false;
329#endif
330 }
331
332 //copy current row to pool or rows waiting for compression
333 char* target_location = fBuffer + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
334 memcpy(target_location, ptr, fRealRowWidth);
335
336 //for now, make an extra copy of the data, for RAWSUM checksuming.
337 //Ideally this should be moved to the threads, along with the drs-offset-calibration
338 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
339 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
340 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
341 char* buffer = fRawSumBuffer.data() + rawOffset;
342 auto ib = fRawSumBuffer.begin();
343 auto ie = fRawSumBuffer.rbegin();
344 *ib++ = 0;
345 *ib++ = 0;
346 *ib++ = 0;
347 *ib = 0;
348
349 *ie++ = 0;
350 *ie++ = 0;
351 *ie++ = 0;
352 *ie = 0;
353
354 memcpy(buffer, ptr, fRealRowWidth);
355
356 fRawSum.add(fRawSumBuffer, false);
357
358 if (IsOffsetCalibrated())
359 {
360
361 int16_t* startCell = reinterpret_cast<int16_t*>(target_location + fStartCellsOffset);
362 int16_t* data = reinterpret_cast<int16_t*>(target_location + fDataOffset);
363
364 for (uint32_t ch=0; ch<1440; ch++)
365 {
366 if (startCell[ch] < 0)
367 {
368 data += fNumSlices;
369 continue;
370 }
371
372 const int16_t modStart = startCell[ch]%1024;
373 const int16_t *off = fOffsetCalibration.data() + ch*1024;
374
375 const int16_t* cal = off+modStart;
376 const int16_t* end_stride = data+fNumSlices;
377
378 if (modStart+fNumSlices > 1024)
379 {
380 while (cal < off+1024)
381 *data++ -= *cal++;
382 cal = off;
383 }
384
385 while (data<end_stride)
386 *data++ -= *cal++;
387 }
388 }
389
390 fTable.num_rows++;
391
392 if (fTable.num_rows % fNumRowsPerTile == 0)
393 {//give a new tile to compress to a thread
394 while (fThreadStatus[fThreadLooper] == _THREAD_COMPRESS_)
395 usleep(100000);
396
397 copyTransposeTile(fThreadLooper);
398
399 while (fThreadStatus[fThreadLooper] != _THREAD_WAIT_)
400 usleep(100000);
401
402 fThreadNumRows[fThreadLooper] = fTable.num_rows;
403 fThreadStatus[fThreadLooper] = _THREAD_COMPRESS_;
404 fThreadLooper = (fThreadLooper+1)%fNumThreads;
405 }
406
407 return true;
408 }
409
410 void FlushNumRows()
411 {
412 SetInt("NAXIS2", fTable.num_rows/fNumRowsPerTile);
413 SetInt("ZNAXIS2", fTable.num_rows);
414 FlushHeader();
415 }
416
417 bool close()
418 {
419 if (tellp() < 0)
420 return false;
421
422 //wait for compression threads to finish
423 for (auto it=fThreadStatus.begin(); it!= fThreadStatus.end(); it++)
424 while (*it != _THREAD_WAIT_)
425 usleep(100000);
426
427 for (auto it=fThreadStatus.begin(); it!= fThreadStatus.end(); it++)
428 *it = _THREAD_EXIT_;
429
430 for (auto it=fThread.begin(); it!= fThread.end(); it++)
431 pthread_join(*it, NULL);
432
433 pthread_mutex_destroy(&fMutex);
434
435 if (fTable.num_rows%fNumRowsPerTile != 0)
436 {
437 copyTransposeTile(0);
438 fThreadNumRows[0] = fTable.num_rows;
439 uint32_t numBytes = compressBuffer(0);
440 writeCompressedDataToDisk(0, numBytes);
441 }
442
443 AlignTo2880Bytes();
444
445 //update header keywords
446 SetInt("ZNAXIS1", fRealRowWidth);
447 SetInt("ZNAXIS2", fTable.num_rows);
448
449 uint64_t heap_offset = fCatalog.size()*fTable.num_cols*sizeof(uint64_t)*2;
450 SetInt("ZHEAPPTR", heap_offset);
451// SetInt("THEAP" , heap_offset);0
452
453 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
454
455 SetInt("THEAP", total_num_tiles_written*2*sizeof(int64_t)*fTable.num_cols);
456
457 SetInt("NAXIS1", 2*sizeof(int64_t)*fTable.num_cols);
458 SetInt("NAXIS2", total_num_tiles_written);
459
460 ostringstream str;
461 str << fRawSum.val();
462 SetStr("RAWSUM", str.str());
463
464 int64_t heap_size = 0;
465 int64_t compressed_offset = 0;
466
467 for (uint32_t i=0; i<total_num_tiles_written; i++)
468 {
469 compressed_offset += sizeof(TileHeader);
470 heap_size += sizeof(TileHeader);
471 for (uint32_t j=0; j<fCatalog[i].size(); j++)
472 {
473 heap_size += fCatalog[i][j].first;
474 fCatalog[i][j].second = compressed_offset;
475 compressed_offset += fCatalog[i][j].first;
476 if (fCatalog[i][j].first == 0)
477 fCatalog[i][j].second = 0;
478 }
479 }
480
481 //add to the heap size the size of the gap between the catalog and the actual heap
482 heap_size += (fCatalog.size() - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
483
484 SetInt("PCOUNT", heap_size, "size of special data area");
485
486 //Just for updating the fCatalogSum value
487 WriteCatalog();
488
489 fDataSum += fCatalogSum;
490
491 const Checksum checksm = UpdateHeaderChecksum();
492
493 ofstream::close();
494
495 if ((checksm+fDataSum).valid())
496 return true;
497
498 ostringstream sout;
499 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
500#ifdef __EXCEPTIONS
501 throw runtime_error(sout.str());
502#else
503 gLog << ___err___ << "ERROR - " << sout.str() << endl;
504 return false;
505#endif
506 }
507
508 //Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
509 bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true)
510 {
511 BlockHeader head;
512 vector<uint16_t> processing(1);
513 processing[0] = kFactRaw;
514 AddColumn(cnt, typechar, name, unit, head, processing, comment, addHeaderKeys);
515 }
516
517 bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, BlockHeader& header, vector<uint16_t>& comp_sequence, const string& comment="", bool addHeaderKeys=true)
518 {
519 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
520 return false;
521
522 Table::Column col;
523 size_t size = SizeFromType(typechar);
524
525 col.name = name;
526 col.type = typechar;
527 col.num = cnt;
528 col.size = size;
529 col.offset = fRealRowWidth;
530
531 fRealRowWidth += size*cnt;
532
533 fRealColumns.emplace_back(CompressedColumn(col, header, comp_sequence));
534
535 ostringstream strKey, strVal, strCom;
536 strKey << "ZFORM" << fRealColumns.size();
537 strVal << cnt << typechar;
538 strCom << "format of " << name << " [" << CommentFromType(typechar);
539 SetStr(strKey.str(), strVal.str(), strCom.str());
540
541 strKey.str("");
542 strVal.str("");
543 strCom.str("");
544 strKey << "ZCTYP" << fRealColumns.size();
545 strVal << "FACT";
546 strCom << "Comp. of FACT telescope";
547 SetStr(strKey.str(), strVal.str(), strCom.str());
548
549 return reallocateBuffers();
550 }
551
552 bool SetNumWorkingThreads(uint32_t num)
553 {
554 if (is_open())
555 {
556#ifdef __EXCEPTIONS
557 throw runtime_error("File must be closed before changing the number of compression threads");
558#else
559 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads");
560#endif
561 return false;
562 }
563 if (num < 1 || num > 64)
564 {
565#ifdef __EXCEPTIONS
566 throw runtime_error("Number of threads must be between 1 and 64");
567#else
568 gLog << ___err___ << "ERROR - Number of threads must be between 1 and 64");
569#endif
570 return false;
571 }
572
573 fNumThreads = num;
574 fThreadStatus.resize(num);
575 fThread.resize(num);
576 fThreadNumRows.resize(num);
577 for (uint32_t i=0;i<num;i++)
578 {
579 fThreadNumRows[i] = 0;
580 fThreadStatus[i] = _THREAD_WAIT_;
581 }
582
583 return reallocateBuffers();
584 }
585
586 private:
587
588
589 bool reallocateBuffers()
590 {
591 fBufferVector.resize(fRealRowWidth*fNumRowsPerTile + 8); //8 more for checksuming
592 memset(fBufferVector.data(), 0, 4);
593 fBuffer = fBufferVector.data()+4;
594
595 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
596
597
598 fTransposedBufferVector.resize(fNumThreads);
599 fCompressedBufferVector.resize(fNumThreads);
600 fTransposedBuffer.resize(fNumThreads);
601 fCompressedBuffer.resize(fNumThreads);
602 for (uint32_t i=0;i<fNumThreads;i++)
603 {
604 fTransposedBufferVector[i].resize(fRealRowWidth*fNumRowsPerTile);
605 fCompressedBufferVector[i].resize(fRealRowWidth*fNumRowsPerTile + fRealColumns.size() + sizeof(TileHeader) + 8);
606 memset(fCompressedBufferVector[i].data(), 0, 4);
607 TileHeader tileHeader;
608 memcpy(fCompressedBufferVector[i].data()+4, &tileHeader, sizeof(TileHeader));
609 fTransposedBuffer[i] = fTransposedBufferVector[i].data();
610 fCompressedBuffer[i] = fCompressedBufferVector[i].data()+4;
611 }
612
613 //give the catalog enough space
614 fCatalog.resize(fNumTiles);
615 for (uint32_t i=0;i<fNumTiles;i++)
616 {
617 fCatalog[i].resize(fRealColumns.size());
618 for (auto it=fCatalog[i].begin(); it!=fCatalog[i].end(); it++)
619 *it = CatalogEntry(0,0);
620 }
621 return true;
622 }
623
624 bool writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite)
625 {
626 char* checkSumPointer = fCompressedBuffer[threadID];
627 int32_t extraBytes = 0;
628 uint32_t sizeToChecksum = sizeToWrite;
629 if (fCheckOffset != 0)
630 {//should we extend the array to the left ?
631 sizeToChecksum += fCheckOffset;
632 checkSumPointer -= fCheckOffset;
633 memset(checkSumPointer, 0, fCheckOffset);
634 }
635 if (sizeToChecksum%4 != 0)
636 {//should we extend the array to the right ?
637 extraBytes = 4 - (sizeToChecksum%4);
638 memset(checkSumPointer+sizeToChecksum, 0,extraBytes);
639 sizeToChecksum += extraBytes;
640 }
641
642 //do the checksum
643 fDataSum.add(checkSumPointer, sizeToChecksum);
644
645 fCheckOffset = (4 - extraBytes)%4;
646 //write data to disk
647 write(fCompressedBuffer[threadID], sizeToWrite);
648 return good();
649 }
650
651 static void* threadFunction(void* context)
652 {
653 zofits* myself = static_cast<zofits*>(context);
654
655 uint32_t myID = 0;
656 pthread_mutex_lock(&(myself->fMutex));
657 myID = myself->fThreadIndex++;
658 pthread_mutex_unlock(&(myself->fMutex));
659 uint32_t threadToWaitForBeforeWriting = (myID == 0) ? myself->fNumThreads-1 : myID-1;
660
661 while (myself->fThreadStatus[myID] != _THREAD_EXIT_)
662 {
663 while (myself->fThreadStatus[myID] == _THREAD_WAIT_)
664 usleep(100000);
665
666 if (myself->fThreadStatus[myID] != _THREAD_COMPRESS_)
667 continue;
668 uint32_t numBytes = myself->compressBuffer(myID);
669 myself->fThreadStatus[myID] = _THREAD_WRITE_;
670
671 //wait for the previous data to be written
672 while (myself->fThreadIndex != threadToWaitForBeforeWriting)
673 usleep(1000);
674 //do the actual writing to disk
675 pthread_mutex_lock(&(myself->fMutex));
676 myself->writeCompressedDataToDisk(myID, numBytes);
677 myself->fThreadIndex = myID;
678 pthread_mutex_unlock(&(myself->fMutex));
679 myself->fThreadStatus[myID] = _THREAD_WAIT_;
680 }
681 return NULL;
682 }
683
684 uint64_t compressBuffer(uint32_t threadIndex)
685 {
686 uint32_t thisRoundNumRows = (fThreadNumRows[threadIndex]%fNumRowsPerTile) ? fThreadNumRows[threadIndex]%fNumRowsPerTile : fNumRowsPerTile;
687 uint32_t offset=0;
688 uint32_t currentCatalogRow = (fThreadNumRows[threadIndex]-1)/fNumRowsPerTile;
689 uint64_t compressedOffset = sizeof(TileHeader); //skip the 'TILE' marker and tile size entry
690
691 //now compress each column one by one by calling compression on arrays
692 for (uint32_t i=0;i<fRealColumns.size();i++)
693 {
694 fCatalog[currentCatalogRow][i].second = compressedOffset;
695
696 if (fRealColumns[i].col.num == 0) continue;
697
698 BlockHeader& head = fRealColumns[i].head;
699 const vector<uint16_t>& sequence = fRealColumns[i].comp_sequence;
700
701 //set the default byte telling if uncompressed the compressed Flag
702 uint64_t previousOffset = compressedOffset;
703 //skip header data
704 compressedOffset += sizeof(BlockHeader) + sizeof(uint16_t)*sequence.size();
705
706 for (uint32_t j=0;j<sequence.size(); j++)
707 {
708 switch (sequence[j])
709 {
710 case zfits::kFactRaw:
711 compressedOffset += compressUNCOMPRESSED(&(fCompressedBuffer[threadIndex][compressedOffset]),
712 &(fTransposedBuffer[threadIndex][offset]),
713 thisRoundNumRows,
714 fRealColumns[i].col.size,
715 fRealColumns[i].col.num);
716 break;
717 case zfits::kFactSmoothing:
718 applySMOOTHING(&(fCompressedBuffer[threadIndex][compressedOffset]),
719 &(fTransposedBuffer[threadIndex][offset]),
720 thisRoundNumRows,
721 fRealColumns[i].col.size,
722 fRealColumns[i].col.num);
723 break;
724 case zfits::kFactHuffman16:
725 if (head.ordering == zfits::kOrderByCol)
726 compressedOffset += compressHUFFMAN(&(fCompressedBuffer[threadIndex][compressedOffset]),
727 &(fTransposedBuffer[threadIndex][offset]),
728 thisRoundNumRows,
729 fRealColumns[i].col.size,
730 fRealColumns[i].col.num);
731 else
732 compressedOffset += compressHUFFMAN(&(fCompressedBuffer[threadIndex][compressedOffset]),
733 &(fTransposedBuffer[threadIndex][offset]),
734 fRealColumns[i].col.num,
735 fRealColumns[i].col.size,
736 thisRoundNumRows);
737 break;
738 default:
739 cout << "ERROR: Unkown compression sequence entry: " << sequence[j] << endl;
740 break;
741 }
742 }
743
744 //check if compressed size is larger than uncompressed
745 if (sequence[0] != zfits::kFactRaw &&
746 compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+sizeof(BlockHeader)+sizeof(uint16_t)*sequence.size())
747 {//if so set flag and redo it uncompressed
748 //cout << "REDOING UNCOMPRESSED" << endl;
749 compressedOffset = previousOffset + sizeof(BlockHeader) + 1;
750 compressedOffset += compressUNCOMPRESSED(&(fCompressedBuffer[threadIndex][compressedOffset]), &(fTransposedBuffer[threadIndex][offset]), thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
751 BlockHeader he;
752 he.size = compressedOffset - previousOffset;
753 he.numProcs = 1;
754 he.ordering = zfits::kOrderByRow;
755 memcpy(&(fCompressedBuffer[threadIndex][previousOffset]), (char*)(&he), sizeof(BlockHeader));
756 fCompressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)] = zfits::kFactRaw;
757 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
758 fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;
759 continue;
760 }
761 head.size = compressedOffset - previousOffset;
762 memcpy(&(fCompressedBuffer[threadIndex][previousOffset]), (char*)(&head), sizeof(BlockHeader));
763 memcpy(&(fCompressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)]), sequence.data(), sizeof(uint16_t)*sequence.size());
764
765 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
766 fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;
767 }
768
769 TileHeader tHead(thisRoundNumRows, compressedOffset);
770 memcpy(fCompressedBuffer[threadIndex], &tHead, sizeof(TileHeader));
771 return compressedOffset;
772 }
773
774 void copyTransposeTile(uint32_t index)
775 {
776 uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile;
777
778 //copy the tile and transpose it
779 uint32_t offset = 0;
780 for (uint32_t i=0;i<fRealColumns.size();i++)
781 {
782 switch (fRealColumns[i].head.ordering)
783 {
784 case zfits::kOrderByRow:
785 for (uint32_t k=0;k<thisRoundNumRows;k++)
786 {//regular, "semi-transposed" copy
787 memcpy(&(fTransposedBuffer[index][offset]), &fBuffer[k*fRealRowWidth + fRealColumns[i].col.offset], fRealColumns[i].col.size*fRealColumns[i].col.num);
788 offset += fRealColumns[i].col.size*fRealColumns[i].col.num;
789 }
790 break;
791
792 case zfits::kOrderByCol :
793 for (int j=0;j<fRealColumns[i].col.num;j++)
794 for (uint32_t k=0;k<thisRoundNumRows;k++)
795 {//transposed copy
796 memcpy(&(fTransposedBuffer[index][offset]), &fBuffer[k*fRealRowWidth + fRealColumns[i].col.offset + fRealColumns[i].col.size*j], fRealColumns[i].col.size);
797 offset += fRealColumns[i].col.size;
798 }
799 break;
800 default:
801 cout << "Error: unknown column ordering: " << fRealColumns[i].head.ordering << endl;
802
803 };
804 }
805 }
806
807 /// Specific compression functions
808 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
809 {
810 memcpy(dest, src, numRows*sizeOfElems*numRowElems);
811 return numRows*sizeOfElems*numRowElems;
812 }
813
814 uint32_t compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
815 {
816 string huffmanOutput;
817 uint32_t previousHuffmanSize = 0;
818 if (numRows < 2)
819 {//if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
820 return numRows*sizeOfElems*numRowElems + 1000;
821 }
822 if (sizeOfElems < 2 )
823 {
824 cout << "Fatal ERROR: HUFMANN can only encode short or longer types" << endl;
825 return 0;
826 }
827 uint32_t huffmanOffset = 0;
828 for (uint32_t j=0;j<numRowElems;j++)
829 {
830 Huffman::Encode(huffmanOutput,
831 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
832 numRows*(sizeOfElems/2));
833 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
834 huffmanOffset += sizeof(uint32_t);
835 previousHuffmanSize = huffmanOutput.size();
836 }
837 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
838
839 //only copy if not larger than not-compressed size
840 if (totalSize < numRows*sizeOfElems*numRowElems)
841 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
842
843 return totalSize;
844 }
845
846 uint32_t compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
847 {
848 uint32_t colWidth = numRowElems;
849 for (int j=colWidth*numRows-1;j>1;j--)
850 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
851 //call the huffman transposed
852 return compressHUFFMAN(dest, src, numRowElems, sizeOfElems, numRows);
853 }
854
855 uint32_t applySMOOTHING(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
856 {
857 uint32_t colWidth = numRowElems;
858 for (int j=colWidth*numRows-1;j>1;j--)
859 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
860
861 return numRows*sizeOfElems*numRowElems;
862 }
863
864 //Offsets calibration stuff.
865 vector<int16_t> fOffsetCalibration; ///< The calibration itself
866 int32_t fStartCellsOffset; ///< Offset in bytes for the startcell data
867 int32_t fDataOffset; ///< Offset in bytes for the data
868 int32_t fNumSlices; ///< Number of samples per pixel per event
869
870 //Compressed data stuff
871 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
872 uint32_t fNumTiles;
873 uint32_t fNumRowsPerTile;
874
875
876 //thread related stuff
877 uint32_t fNumThreads; ///< The number of threads that will be used to compress
878 uint32_t fThreadIndex; ///< A variable to assign threads indices
879 vector<pthread_t> fThread; ///< The thread handler of the compressor
880 vector<uint32_t> fThreadNumRows; ///< Total number of rows for thread to compresswww.;wwwwww
881 vector<uint32_t> fThreadStatus; ///< Flag telling whether the buffer to be transposed (and compressed) is full or empty
882 int32_t fThreadLooper; ///< Which thread will deal with the upcoming bunch of data ?
883 pthread_mutex_t fMutex; ///< mutex for compressing threads
884
885 struct CatalogEntry
886 {
887 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) {};
888 int64_t first;
889 int64_t second;
890 } __attribute__((__packed__));
891
892 // typedef pair<int64_t, int64_t> CatalogEntry;
893 typedef vector<CatalogEntry> CatalogRow;
894 typedef vector<CatalogRow> CatalogType;
895 CatalogType fCatalog;
896 Checksum fCatalogSum;
897 Checksum fRawSum;
898 off_t fCatalogOffset;
899 vector<char> fBufferVector;
900 vector<char> fRawSumBuffer;
901 vector<vector<char>> fTransposedBufferVector;
902 vector<vector<char>> fCompressedBufferVector;
903 char* fBuffer;
904 vector<char*> fTransposedBuffer;
905 vector<char*> fCompressedBuffer;
906 uint32_t fRealRowWidth;
907 struct CompressedColumn
908 {
909 CompressedColumn(Table::Column& c, BlockHeader& h, vector<uint16_t>& cs) : col(c),
910 head(h),
911 comp_sequence(cs)
912 {}
913 Table::Column col;
914 BlockHeader head;
915 vector<uint16_t> comp_sequence;
916 };
917 vector<CompressedColumn> fRealColumns;
918
919 //thread states. Not all used, but they do not hurt
920 static const uint32_t _THREAD_WAIT_ = 0; ///< Thread doing nothing
921 static const uint32_t _THREAD_COMPRESS_ = 1; ///< Thread working, compressing
922 static const uint32_t _THREAD_DECOMPRESS_ = 2; ///< Thread working, decompressing
923 static const uint32_t _THREAD_WRITE_ = 3; ///< Thread writing data to disk
924 static const uint32_t _THREAD_READ_ = 4; ///< Thread reading data from disk
925 static const uint32_t _THREAD_EXIT_ = 5; ///< Thread exiting
926
927};
928
929#ifndef __MARS__
930}; //namespace std
931#endif
932
933#ifdef crappy_example_usage
934zofits zofitsfile(123456, 100);
935zofitsfile.SetNumWorkingThreads(numThreads);
936zofitsfile.open((fileNameOut).c_str());
937std::zofits::BlockHeader zoheader(0, zfits::kOrderByRow, 2);
938vector<uint16_t> smoothmanProcessings(2);
939smoothmanProcessings[0] = zfits::kFactSmoothing;
940smoothmanProcessings[1] = zfits::kFactHuffman16;
941
942zofitsfile.AddColumn(sortedColumns[i].num,
943 sortedColumns[i].type,
944 colName,
945 "");
946
947zofitsfile.AddColumn(sortedColumns[i].num,
948 sortedColumns[i].type,
949 colName,
950 "",
951 zoheader,
952 smoothmanProcessings);
953
954zofitsfile.SetStr("ZCHKSUM", i->second.value, i->second.comment);
955zofitsfile.SetDrsCalibration(drsCalibFloat);
956zofitsfile.WriteTableHeader(tableName.c_str());
957zofitsfile.WriteRow(buffer, rowWidth);
958zofitsfile.close();
959
960#endif
Note: See TracBrowser for help on using the repository browser.