| 1 | #ifndef FACT_zofits
 | 
|---|
| 2 | #define FACT_zofits
 | 
|---|
| 3 | 
 | 
|---|
| 4 | /*
 | 
|---|
| 5 |  * zofits.h
 | 
|---|
| 6 |  *
 | 
|---|
| 7 |  *  FACT native compressed FITS writer
 | 
|---|
| 8 |  *      Author: lyard
 | 
|---|
| 9 |  */
 | 
|---|
| 10 | 
 | 
|---|
| 11 | #include "ofits.h"
 | 
|---|
| 12 | #include "huffman.h"
 | 
|---|
| 13 | #include "Queue.h"
 | 
|---|
| 14 | #include "MemoryManager.h"
 | 
|---|
| 15 | 
 | 
|---|
| 16 | #ifdef HAVE_BOOST_THREAD
 | 
|---|
| 17 | #include <boost/thread.hpp>
 | 
|---|
| 18 | #endif
 | 
|---|
| 19 | 
 | 
|---|
| 20 | class zofits : public ofits
 | 
|---|
| 21 | {
 | 
|---|
| 22 |         /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin
 | 
|---|
| 23 |         template<class S>
 | 
|---|
| 24 |         struct QueueMin : std::list<S>
 | 
|---|
| 25 |         {
 | 
|---|
| 26 |             typename std::list<S>::iterator begin()
 | 
|---|
| 27 |             {
 | 
|---|
| 28 |                 return min_element(std::list<S>::begin(), std::list<S>::end());
 | 
|---|
| 29 |             }
 | 
|---|
| 30 |         };
 | 
|---|
| 31 | 
 | 
|---|
| 32 | 
 | 
|---|
| 33 |         //catalog types
 | 
|---|
| 34 |         struct CatalogEntry
 | 
|---|
| 35 |         {
 | 
|---|
| 36 |             CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) { }
 | 
|---|
| 37 |             int64_t first;   ///< Size of this column in the tile
 | 
|---|
| 38 |             int64_t second;  ///< offset of this column in the tile, from the start of the heap area
 | 
|---|
| 39 |         } __attribute__((__packed__));
 | 
|---|
| 40 | 
 | 
|---|
| 41 |         typedef std::vector<CatalogEntry> CatalogRow;
 | 
|---|
| 42 |         typedef std::list<CatalogRow>     CatalogType;
 | 
|---|
| 43 | 
 | 
|---|
| 44 | 
 | 
|---|
| 45 |         /// Parameters required to write a tile to disk
 | 
|---|
| 46 |         struct WriteTarget
 | 
|---|
| 47 |         {
 | 
|---|
| 48 |             bool operator < (const WriteTarget& other) const
 | 
|---|
| 49 |             {
 | 
|---|
| 50 |                 return tile_num < other.tile_num;
 | 
|---|
| 51 |             }
 | 
|---|
| 52 | 
 | 
|---|
| 53 |             WriteTarget() { }
 | 
|---|
| 54 |             WriteTarget(const WriteTarget &t, uint32_t sz) : tile_num(t.tile_num), size(sz), data(t.data) { }
 | 
|---|
| 55 | 
 | 
|---|
| 56 |             uint32_t              tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
 | 
|---|
| 57 |             uint32_t              size;     ///< Size to write
 | 
|---|
| 58 |             std::shared_ptr<char> data;     ///< Memory block to write
 | 
|---|
| 59 |         };
 | 
|---|
| 60 | 
 | 
|---|
| 61 | 
 | 
|---|
| 62 |         /// Parameters required to compress a tile of data
 | 
|---|
| 63 |         struct CompressionTarget
 | 
|---|
| 64 |         {
 | 
|---|
| 65 |             CompressionTarget(CatalogRow& r) : catalog_entry(r)
 | 
|---|
| 66 |             {}
 | 
|---|
| 67 | 
 | 
|---|
| 68 |             CatalogRow&           catalog_entry;   ///< Reference to the catalog entry to deal with
 | 
|---|
| 69 |             std::shared_ptr<char> src;             ///< Original data
 | 
|---|
| 70 |             std::shared_ptr<char> transposed_src;  ///< Transposed data
 | 
|---|
| 71 |             WriteTarget           target;          ///< Compressed data
 | 
|---|
| 72 |             uint32_t              num_rows;        ///< Number of rows to compress
 | 
|---|
| 73 |          };
 | 
|---|
| 74 | 
 | 
|---|
| 75 | public:
 | 
|---|
| 76 |         /// static setter for the default number of threads to use. -1 means all available physical cores
 | 
|---|
| 77 |         static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)<-1) n=_n; return n; }
 | 
|---|
| 78 |         static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; }
 | 
|---|
| 79 |         static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; }
 | 
|---|
| 80 |         static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; }
 | 
|---|
| 81 | 
 | 
|---|
| 82 |         /// constructors
 | 
|---|
| 83 |         /// @param numTiles how many data groups should be pre-reserved ?
 | 
|---|
| 84 |         /// @param rowPerTile how many rows will be grouped together in a single tile
 | 
|---|
| 85 |         /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
 | 
|---|
| 86 |         zofits(uint32_t numTiles    = DefaultMaxNumTiles(),
 | 
|---|
| 87 |                uint32_t rowPerTile  = DefaultNumRowsPerTile(),
 | 
|---|
| 88 |                uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
 | 
|---|
| 89 |             fMemPool(0, maxUsableMem*1000),
 | 
|---|
| 90 |             fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
 | 
|---|
| 91 |         {
 | 
|---|
| 92 |             InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
 | 
|---|
| 93 |             SetNumThreads(DefaultNumThreads());
 | 
|---|
| 94 |         }
 | 
|---|
| 95 | 
 | 
|---|
| 96 |         /// @param fname the target filename
 | 
|---|
| 97 |         /// @param numTiles how many data groups should be pre-reserved ?
 | 
|---|
| 98 |         /// @param rowPerTile how many rows will be grouped together in a single tile
 | 
|---|
| 99 |         /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
 | 
|---|
| 100 |         zofits(const char* fname,
 | 
|---|
| 101 |                uint32_t numTiles    = DefaultMaxNumTiles(),
 | 
|---|
| 102 |                uint32_t rowPerTile  = DefaultNumRowsPerTile(),
 | 
|---|
| 103 |                uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(fname),
 | 
|---|
| 104 |                    fMemPool(0, maxUsableMem*1000),
 | 
|---|
| 105 |                    fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
 | 
|---|
| 106 |         {
 | 
|---|
| 107 |             InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
 | 
|---|
| 108 |             SetNumThreads(DefaultNumThreads());
 | 
|---|
| 109 |         }
 | 
|---|
| 110 | 
 | 
|---|
| 111 |         //initialization of member variables
 | 
|---|
| 112 |         /// @param nt number of tiles
 | 
|---|
| 113 |         /// @param rpt number of rows per tile
 | 
|---|
| 114 |         /// @param maxUsableMem max amount of RAM to be used by the compression buffers
 | 
|---|
| 115 |         void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
 | 
|---|
| 116 |         {
 | 
|---|
| 117 |             fCheckOffset = 0;
 | 
|---|
| 118 |             fNumQueues   = 0;
 | 
|---|
| 119 | 
 | 
|---|
| 120 |             fNumTiles       = nt==0 ? 1 : nt;
 | 
|---|
| 121 |             fNumRowsPerTile = rpt;
 | 
|---|
| 122 | 
 | 
|---|
| 123 |             fRealRowWidth     = 0;
 | 
|---|
| 124 |             fCatalogOffset    = 0;
 | 
|---|
| 125 |             fCatalogSize      = 0;
 | 
|---|
| 126 | 
 | 
|---|
| 127 |             fMaxUsableMem = maxUsableMem;
 | 
|---|
| 128 | #ifdef __EXCEPTIONS
 | 
|---|
| 129 |             fThreadsException = std::exception_ptr();
 | 
|---|
| 130 | #endif
 | 
|---|
| 131 |             fErrno = 0;
 | 
|---|
| 132 |         }
 | 
|---|
| 133 | 
 | 
|---|
| 134 |         /// write the header of the binary table
 | 
|---|
| 135 |         /// @param name the name of the table to be created
 | 
|---|
| 136 |         /// @return the state of the file
 | 
|---|
| 137 |         virtual bool WriteTableHeader(const char* name="DATA")
 | 
|---|
| 138 |         {
 | 
|---|
| 139 |             reallocateBuffers();
 | 
|---|
| 140 | 
 | 
|---|
| 141 |             ofits::WriteTableHeader(name);
 | 
|---|
| 142 | 
 | 
|---|
| 143 |             fCompressionQueues.front().setPromptExecution(fNumQueues==0);
 | 
|---|
| 144 |             fWriteToDiskQueue.setPromptExecution(fNumQueues==0);
 | 
|---|
| 145 | 
 | 
|---|
| 146 |             if (fNumQueues != 0)
 | 
|---|
| 147 |             {
 | 
|---|
| 148 |                 //start the compression queues
 | 
|---|
| 149 |                 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
 | 
|---|
| 150 |                     it->start();
 | 
|---|
| 151 | 
 | 
|---|
| 152 |                 //start the disk writer
 | 
|---|
| 153 |                 fWriteToDiskQueue.start();
 | 
|---|
| 154 |             }
 | 
|---|
| 155 | 
 | 
|---|
| 156 |             //mark that no tile has been written so far
 | 
|---|
| 157 |             fLatestWrittenTile = -1;
 | 
|---|
| 158 | 
 | 
|---|
| 159 |             //no wiring error (in the writing of the data) has occured so far
 | 
|---|
| 160 |             fErrno = 0;
 | 
|---|
| 161 | 
 | 
|---|
| 162 |             return good();
 | 
|---|
| 163 |         }
 | 
|---|
| 164 | 
 | 
|---|
| 165 |         /// open a new file.
 | 
|---|
| 166 |         /// @param filename the name of the file
 | 
|---|
| 167 |         /// @param Whether or not the name of the extension should be added or not
 | 
|---|
| 168 |         void open(const char* filename, bool addEXTNAMEKey=true)
 | 
|---|
| 169 |         {
 | 
|---|
| 170 |             ofits::open(filename, addEXTNAMEKey);
 | 
|---|
| 171 | 
 | 
|---|
| 172 |             //add compression-related header entries
 | 
|---|
| 173 |             SetBool( "ZTABLE",   true,            "Table is compressed");
 | 
|---|
| 174 |             SetInt(  "ZNAXIS1",  0,               "Width of uncompressed rows");
 | 
|---|
| 175 |             SetInt(  "ZNAXIS2",  0,               "Number of uncompressed rows");
 | 
|---|
| 176 |             SetInt(  "ZPCOUNT",  0,               "");
 | 
|---|
| 177 |             SetInt(  "ZHEAPPTR", 0,               "");
 | 
|---|
| 178 |             SetInt(  "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
 | 
|---|
| 179 |             SetInt(  "THEAP",    0,               "");
 | 
|---|
| 180 |             SetStr(  "RAWSUM",   "         0",    "Checksum of raw little endian data");
 | 
|---|
| 181 |             SetFloat("ZRATIO",   0,               "Compression ratio");
 | 
|---|
| 182 |             SetInt(  "ZSHRINK",  1,               "Catalog shrink factor");
 | 
|---|
| 183 | 
 | 
|---|
| 184 |             fCatalogSize   = 0;
 | 
|---|
| 185 |             fRealRowWidth  = 0;
 | 
|---|
| 186 |             fCatalogOffset = 0;
 | 
|---|
| 187 |             fCatalogSize   = 0;
 | 
|---|
| 188 |             fCheckOffset   = 0;
 | 
|---|
| 189 | 
 | 
|---|
| 190 |             fRealColumns.clear();
 | 
|---|
| 191 |             fCatalog.clear();
 | 
|---|
| 192 |             fCatalogSum.reset();
 | 
|---|
| 193 |             fRawSum.reset();
 | 
|---|
| 194 |         }
 | 
|---|
| 195 | 
 | 
|---|
| 196 |         /// Super method. does nothing as zofits does not know about DrsOffsets
 | 
|---|
| 197 |         /// @return the state of the file
 | 
|---|
| 198 |         virtual bool WriteDrsOffsetsTable()
 | 
|---|
| 199 |         {
 | 
|---|
| 200 |             return good();
 | 
|---|
| 201 |         }
 | 
|---|
| 202 | 
 | 
|---|
| 203 |         /// Returns the number of bytes per uncompressed row
 | 
|---|
| 204 |         /// @return number of bytes per uncompressed row
 | 
|---|
| 205 |         uint32_t GetBytesPerRow() const
 | 
|---|
| 206 |         {
 | 
|---|
| 207 |             return fRealRowWidth;
 | 
|---|
| 208 |         }
 | 
|---|
| 209 | 
 | 
|---|
| 210 |         /// Write the data catalog
 | 
|---|
| 211 |         /// @return the state of the file
 | 
|---|
| 212 |         bool WriteCatalog()
 | 
|---|
| 213 |         {
 | 
|---|
| 214 |             const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
 | 
|---|
| 215 |             const uint32_t total_catalog_size   = fNumTiles*one_catalog_row_size;
 | 
|---|
| 216 | 
 | 
|---|
| 217 |             // swap the catalog bytes before writing
 | 
|---|
| 218 |             std::vector<char> swapped_catalog(total_catalog_size);
 | 
|---|
| 219 | 
 | 
|---|
| 220 |             uint32_t shift = 0;
 | 
|---|
| 221 |             for (auto it=fCatalog.cbegin(); it!=fCatalog.cend(); it++)
 | 
|---|
| 222 |             {
 | 
|---|
| 223 |                 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
 | 
|---|
| 224 |                 shift += one_catalog_row_size;
 | 
|---|
| 225 |             }
 | 
|---|
| 226 | 
 | 
|---|
| 227 |             if (fCatalogSize < fNumTiles)
 | 
|---|
| 228 |                 memset(swapped_catalog.data()+shift, 0, total_catalog_size-shift);
 | 
|---|
| 229 | 
 | 
|---|
| 230 |             // first time writing ? remember where we are
 | 
|---|
| 231 |             if (fCatalogOffset == 0)
 | 
|---|
| 232 |                 fCatalogOffset = tellp();
 | 
|---|
| 233 | 
 | 
|---|
| 234 |             // remember where we came from
 | 
|---|
| 235 |             const off_t where_are_we = tellp();
 | 
|---|
| 236 | 
 | 
|---|
| 237 |             // write to disk
 | 
|---|
| 238 |             seekp(fCatalogOffset);
 | 
|---|
| 239 |             write(swapped_catalog.data(), total_catalog_size);
 | 
|---|
| 240 | 
 | 
|---|
| 241 |             if (where_are_we != fCatalogOffset)
 | 
|---|
| 242 |                 seekp(where_are_we);
 | 
|---|
| 243 | 
 | 
|---|
| 244 |             // udpate checksum
 | 
|---|
| 245 |             fCatalogSum.reset();
 | 
|---|
| 246 |             fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
 | 
|---|
| 247 | 
 | 
|---|
| 248 |             return good();
 | 
|---|
| 249 |         }
 | 
|---|
| 250 | 
 | 
|---|
| 251 |         /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets.
 | 
|---|
| 252 |         virtual void DrsOffsetCalibrate(char* )
 | 
|---|
| 253 |         {
 | 
|---|
| 254 | 
 | 
|---|
| 255 |         }
 | 
|---|
| 256 | 
 | 
|---|
| 257 |         CatalogRow& AddOneCatalogRow()
 | 
|---|
| 258 |         {
 | 
|---|
| 259 |             // add one row to the catalog
 | 
|---|
| 260 |             fCatalog.emplace_back(CatalogRow());
 | 
|---|
| 261 |             fCatalog.back().resize(fTable.num_cols);
 | 
|---|
| 262 |             for (auto it=fCatalog.back().begin(); it != fCatalog.back().end(); it++)
 | 
|---|
| 263 |                 *it = CatalogEntry(0,0);
 | 
|---|
| 264 | 
 | 
|---|
| 265 |             fCatalogSize++;
 | 
|---|
| 266 | 
 | 
|---|
| 267 |             return fCatalog.back();
 | 
|---|
| 268 |         }
 | 
|---|
| 269 | 
 | 
|---|
| 270 |         /// write one row of data
 | 
|---|
| 271 |         /// Note, in a multi-threaded environment (NumThreads>0), the return code should be checked rather
 | 
|---|
| 272 |         /// than the badbit() of the stream (it might have been set by a thread before the errno has been set)
 | 
|---|
| 273 |         /// errno will then contain the correct error number of the last error which happened during writing.
 | 
|---|
| 274 |         /// @param ptr the source buffer
 | 
|---|
| 275 |         /// @param the number of bytes to write
 | 
|---|
| 276 |         /// @return the state of the file. WARNING: with multithreading, this will most likely be the state of the file before the data is actually written
 | 
|---|
| 277 |         bool WriteRow(const void* ptr, size_t cnt, bool = true)
 | 
|---|
| 278 |         {
 | 
|---|
| 279 |             if (cnt != fRealRowWidth)
 | 
|---|
| 280 |             {
 | 
|---|
| 281 | #ifdef __EXCEPTIONS
 | 
|---|
| 282 |                 throw std::runtime_error("Wrong size of row given to WriteRow");
 | 
|---|
| 283 | #else
 | 
|---|
| 284 |                 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << std::endl;
 | 
|---|
| 285 |                 return false;
 | 
|---|
| 286 | #endif
 | 
|---|
| 287 |             }
 | 
|---|
| 288 | 
 | 
|---|
| 289 | #ifdef __EXCEPTIONS
 | 
|---|
| 290 |             //check if something hapenned while the compression threads were working
 | 
|---|
| 291 |             //if so, re-throw the exception that was generated
 | 
|---|
| 292 |             if (fThreadsException != std::exception_ptr())
 | 
|---|
| 293 |                 std::rethrow_exception(fThreadsException);
 | 
|---|
| 294 | #endif
 | 
|---|
| 295 | 
 | 
|---|
| 296 |             //copy current row to pool or rows waiting for compression
 | 
|---|
| 297 |             char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
 | 
|---|
| 298 |             memcpy(target_location, ptr, fRealRowWidth);
 | 
|---|
| 299 | 
 | 
|---|
| 300 |             //for now, make an extra copy of the data, for RAWSUM checksuming.
 | 
|---|
| 301 |             //Ideally this should be moved to the threads
 | 
|---|
| 302 |             //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
 | 
|---|
| 303 |             //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
 | 
|---|
| 304 |             const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
 | 
|---|
| 305 |             char* buffer = fRawSumBuffer.data() + rawOffset;
 | 
|---|
| 306 |             auto ib = fRawSumBuffer.begin();
 | 
|---|
| 307 |             auto ie = fRawSumBuffer.rbegin();
 | 
|---|
| 308 |             *ib++ = 0;
 | 
|---|
| 309 |             *ib++ = 0;
 | 
|---|
| 310 |             *ib++ = 0;
 | 
|---|
| 311 |             *ib   = 0;
 | 
|---|
| 312 | 
 | 
|---|
| 313 |             *ie++ = 0;
 | 
|---|
| 314 |             *ie++ = 0;
 | 
|---|
| 315 |             *ie++ = 0;
 | 
|---|
| 316 |             *ie   = 0;
 | 
|---|
| 317 | 
 | 
|---|
| 318 |             memcpy(buffer, ptr, fRealRowWidth);
 | 
|---|
| 319 | 
 | 
|---|
| 320 |             fRawSum.add(fRawSumBuffer, false);
 | 
|---|
| 321 | 
 | 
|---|
| 322 |             fTable.num_rows++;
 | 
|---|
| 323 | 
 | 
|---|
| 324 |             if (fTable.num_rows % fNumRowsPerTile != 0)
 | 
|---|
| 325 |             {
 | 
|---|
| 326 |                 errno = fErrno;
 | 
|---|
| 327 |                 return errno==0;
 | 
|---|
| 328 |             }
 | 
|---|
| 329 | 
 | 
|---|
| 330 |             // use the least occupied queue
 | 
|---|
| 331 |             const auto imin = std::min_element(fCompressionQueues.begin(), fCompressionQueues.end());
 | 
|---|
| 332 | 
 | 
|---|
| 333 |             if (!imin->emplace(InitNextCompression()))
 | 
|---|
| 334 |                 throw std::runtime_error("The compression queues are not started. Did you close the file before writing this row?");
 | 
|---|
| 335 | 
 | 
|---|
| 336 |             errno = fErrno;
 | 
|---|
| 337 |             return errno==0;
 | 
|---|
| 338 |         }
 | 
|---|
| 339 | 
 | 
|---|
| 340 |         /// update the real number of rows
 | 
|---|
| 341 |         void FlushNumRows()
 | 
|---|
| 342 |         {
 | 
|---|
| 343 |             SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile);
 | 
|---|
| 344 |             SetInt("ZNAXIS2", fTable.num_rows);
 | 
|---|
| 345 |             FlushHeader();
 | 
|---|
| 346 |         }
 | 
|---|
| 347 | 
 | 
|---|
| 348 |         /// Setup the environment to compress yet another tile of data
 | 
|---|
| 349 |         /// @param target the struct where to host the produced parameters
 | 
|---|
| 350 |         CompressionTarget InitNextCompression()
 | 
|---|
| 351 |         {
 | 
|---|
| 352 |             CompressionTarget target(AddOneCatalogRow());
 | 
|---|
| 353 | 
 | 
|---|
| 354 |             //fill up compression target
 | 
|---|
| 355 |             target.src            = fSmartBuffer;
 | 
|---|
| 356 |             target.transposed_src = fMemPool.malloc();
 | 
|---|
| 357 |             target.num_rows       = fTable.num_rows;
 | 
|---|
| 358 | 
 | 
|---|
| 359 |             //fill up write to disk target
 | 
|---|
| 360 |             WriteTarget &write_target = target.target;
 | 
|---|
| 361 |             write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
 | 
|---|
| 362 |             write_target.size     = 0;
 | 
|---|
| 363 |             write_target.data     = fMemPool.malloc();
 | 
|---|
| 364 | 
 | 
|---|
| 365 |             //get a new buffer to host the incoming data
 | 
|---|
| 366 |             fSmartBuffer = fMemPool.malloc();
 | 
|---|
| 367 | 
 | 
|---|
| 368 |             return target;
 | 
|---|
| 369 |         }
 | 
|---|
| 370 | 
 | 
|---|
| 371 |         /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file.
 | 
|---|
| 372 |         const uint32_t ShrinkCatalog()
 | 
|---|
| 373 |         {
 | 
|---|
| 374 |             //add empty row to get either the target number of rows, or a multiple of the allowed size
 | 
|---|
| 375 |             for (uint32_t i=0;i<fCatalogSize%fNumTiles;i++)
 | 
|---|
| 376 |                 AddOneCatalogRow();
 | 
|---|
| 377 | 
 | 
|---|
| 378 |             //did we write more rows than what the catalog could host ?
 | 
|---|
| 379 |             if (fCatalogSize <= fNumTiles) // nothing to do
 | 
|---|
| 380 |                 return 1;
 | 
|---|
| 381 | 
 | 
|---|
| 382 |             //always exact as extra rows were added just above
 | 
|---|
| 383 |             const uint32_t shrink_factor = fCatalogSize / fNumTiles; 
 | 
|---|
| 384 | 
 | 
|---|
| 385 |             //shrink the catalog !
 | 
|---|
| 386 |             uint32_t entry_id = 1;
 | 
|---|
| 387 |             auto it = fCatalog.begin();
 | 
|---|
| 388 |             it++;
 | 
|---|
| 389 |             for (; it != fCatalog.end(); it++)
 | 
|---|
| 390 |             {
 | 
|---|
| 391 |                 if (entry_id >= fNumTiles)
 | 
|---|
| 392 |                     break;
 | 
|---|
| 393 | 
 | 
|---|
| 394 |                 const uint32_t target_id = entry_id*shrink_factor;
 | 
|---|
| 395 | 
 | 
|---|
| 396 |                 auto jt = it;
 | 
|---|
| 397 |                 for (uint32_t i=0; i<target_id-entry_id; i++)
 | 
|---|
| 398 |                     jt++;
 | 
|---|
| 399 | 
 | 
|---|
| 400 |                 *it = *jt;
 | 
|---|
| 401 | 
 | 
|---|
| 402 |                 entry_id++;
 | 
|---|
| 403 |             }
 | 
|---|
| 404 | 
 | 
|---|
| 405 |             const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
 | 
|---|
| 406 | 
 | 
|---|
| 407 |             //remove the too many entries
 | 
|---|
| 408 |             for (uint32_t i=0;i<num_tiles_to_remove;i++)
 | 
|---|
| 409 |             {
 | 
|---|
| 410 |                 fCatalog.pop_back();
 | 
|---|
| 411 |                 fCatalogSize--;
 | 
|---|
| 412 |             }
 | 
|---|
| 413 | 
 | 
|---|
| 414 |             //update header keywords
 | 
|---|
| 415 |             fNumRowsPerTile *= shrink_factor;
 | 
|---|
| 416 | 
 | 
|---|
| 417 |             SetInt("ZTILELEN", fNumRowsPerTile);
 | 
|---|
| 418 |             SetInt("ZSHRINK",  shrink_factor);
 | 
|---|
| 419 | 
 | 
|---|
| 420 |             return shrink_factor;
 | 
|---|
| 421 |         }
 | 
|---|
| 422 | 
 | 
|---|
| 423 |         /// close an open file.
 | 
|---|
| 424 |         /// @return the state of the file
 | 
|---|
| 425 |         bool close()
 | 
|---|
| 426 |         {
 | 
|---|
| 427 |             // stop compression and write threads
 | 
|---|
| 428 |             for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
 | 
|---|
| 429 |                 it->wait();
 | 
|---|
| 430 | 
 | 
|---|
| 431 |             fWriteToDiskQueue.wait();
 | 
|---|
| 432 | 
 | 
|---|
| 433 |             if (tellp() < 0)
 | 
|---|
| 434 |                 return false;
 | 
|---|
| 435 | 
 | 
|---|
| 436 | #ifdef __EXCEPTIONS
 | 
|---|
| 437 |             //check if something hapenned while the compression threads were working
 | 
|---|
| 438 |             //if so, re-throw the exception that was generated
 | 
|---|
| 439 |             if (fThreadsException != std::exception_ptr())
 | 
|---|
| 440 |                 std::rethrow_exception(fThreadsException);
 | 
|---|
| 441 | #endif
 | 
|---|
| 442 | 
 | 
|---|
| 443 |             //write the last tile of data (if any)
 | 
|---|
| 444 |             if (fErrno==0 && fTable.num_rows%fNumRowsPerTile!=0)
 | 
|---|
| 445 |             {
 | 
|---|
| 446 |                 fWriteToDiskQueue.enablePromptExecution();
 | 
|---|
| 447 |                 fCompressionQueues.front().enablePromptExecution();
 | 
|---|
| 448 |                 fCompressionQueues.front().emplace(InitNextCompression());
 | 
|---|
| 449 |             }
 | 
|---|
| 450 | 
 | 
|---|
| 451 |             AlignTo2880Bytes();
 | 
|---|
| 452 | 
 | 
|---|
| 453 |             int64_t heap_size = 0;
 | 
|---|
| 454 |             int64_t compressed_offset = 0;
 | 
|---|
| 455 |             for (auto it=fCatalog.begin(); it!= fCatalog.end(); it++)
 | 
|---|
| 456 |             {
 | 
|---|
| 457 |                 compressed_offset += sizeof(FITS::TileHeader);
 | 
|---|
| 458 |                 heap_size         += sizeof(FITS::TileHeader);
 | 
|---|
| 459 |                 for (uint32_t j=0; j<it->size(); j++)
 | 
|---|
| 460 |                 {
 | 
|---|
| 461 |                     heap_size += (*it)[j].first;
 | 
|---|
| 462 |                     (*it)[j].second = compressed_offset;
 | 
|---|
| 463 |                     compressed_offset += (*it)[j].first;
 | 
|---|
| 464 |                     if ((*it)[j].first == 0)
 | 
|---|
| 465 |                         (*it)[j].second = 0;
 | 
|---|
| 466 |                 }
 | 
|---|
| 467 |             }
 | 
|---|
| 468 | 
 | 
|---|
| 469 |             const uint32_t shrink_factor = ShrinkCatalog();
 | 
|---|
| 470 | 
 | 
|---|
| 471 |             //update header keywords
 | 
|---|
| 472 |             SetInt("ZNAXIS1", fRealRowWidth);
 | 
|---|
| 473 |             SetInt("ZNAXIS2", fTable.num_rows);
 | 
|---|
| 474 | 
 | 
|---|
| 475 |             SetInt("ZHEAPPTR", fCatalogSize*fTable.num_cols*sizeof(uint64_t)*2);
 | 
|---|
| 476 | 
 | 
|---|
| 477 |             const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
 | 
|---|
| 478 |             const uint32_t total_catalog_width     = 2*sizeof(int64_t)*fTable.num_cols;
 | 
|---|
| 479 | 
 | 
|---|
| 480 |             SetInt("THEAP",  total_num_tiles_written*total_catalog_width);
 | 
|---|
| 481 |             SetInt("NAXIS1", total_catalog_width);
 | 
|---|
| 482 |             SetInt("NAXIS2", total_num_tiles_written);
 | 
|---|
| 483 |             SetStr("RAWSUM", std::to_string((long long int)(fRawSum.val())));
 | 
|---|
| 484 | 
 | 
|---|
| 485 |             const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
 | 
|---|
| 486 |             SetFloat("ZRATIO", compression_ratio);
 | 
|---|
| 487 | 
 | 
|---|
| 488 |             //add to the heap size the size of the gap between the catalog and the actual heap
 | 
|---|
| 489 |             heap_size += (fCatalogSize - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
 | 
|---|
| 490 | 
 | 
|---|
| 491 |             SetInt("PCOUNT", heap_size, "size of special data area");
 | 
|---|
| 492 | 
 | 
|---|
| 493 |             //Just for updating the fCatalogSum value
 | 
|---|
| 494 |             WriteCatalog();
 | 
|---|
| 495 | 
 | 
|---|
| 496 |             fDataSum += fCatalogSum;
 | 
|---|
| 497 | 
 | 
|---|
| 498 |             const Checksum checksm = UpdateHeaderChecksum();
 | 
|---|
| 499 | 
 | 
|---|
| 500 |             std::ofstream::close();
 | 
|---|
| 501 | 
 | 
|---|
| 502 |             fSmartBuffer = std::shared_ptr<char>();
 | 
|---|
| 503 | 
 | 
|---|
| 504 |             //restore the number of rows per tile in case the catalog has been shrinked
 | 
|---|
| 505 |             if (shrink_factor != 1)
 | 
|---|
| 506 |                 fNumRowsPerTile /= shrink_factor;
 | 
|---|
| 507 | 
 | 
|---|
| 508 |             if ((checksm+fDataSum).valid())
 | 
|---|
| 509 |                 return true;
 | 
|---|
| 510 | 
 | 
|---|
| 511 |             std::ostringstream sout;
 | 
|---|
| 512 |             sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
 | 
|---|
| 513 | #ifdef __EXCEPTIONS
 | 
|---|
| 514 |             throw std::runtime_error(sout.str());
 | 
|---|
| 515 | #else
 | 
|---|
| 516 |             gLog << ___err___ << "ERROR - " << sout.str() << std::endl;
 | 
|---|
| 517 |             return false;
 | 
|---|
| 518 | #endif
 | 
|---|
| 519 |         }
 | 
|---|
| 520 | 
 | 
|---|
| 521 |         /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
 | 
|---|
| 522 |         bool AddColumn(uint32_t cnt, char typechar, const std::string& name, const std::string& unit,
 | 
|---|
| 523 |                        const std::string& comment="", bool addHeaderKeys=true)
 | 
|---|
| 524 |         {
 | 
|---|
| 525 |             return AddColumn(FITS::kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
 | 
|---|
| 526 |         }
 | 
|---|
| 527 | 
 | 
|---|
| 528 |         /// Overload of the simplified compressed version
 | 
|---|
| 529 |         bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const std::string& name,
 | 
|---|
| 530 |                        const std::string& unit, const std::string& comment="", bool addHeaderKeys=true)
 | 
|---|
| 531 |         {
 | 
|---|
| 532 |             if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
 | 
|---|
| 533 |                 return false;
 | 
|---|
| 534 | 
 | 
|---|
| 535 |             const size_t size = SizeFromType(typechar);
 | 
|---|
| 536 | 
 | 
|---|
| 537 |             Table::Column col;
 | 
|---|
| 538 |             col.name   = name;
 | 
|---|
| 539 |             col.type   = typechar;
 | 
|---|
| 540 |             col.num    = cnt;
 | 
|---|
| 541 |             col.size   = size;
 | 
|---|
| 542 |             col.offset = fRealRowWidth;
 | 
|---|
| 543 | 
 | 
|---|
| 544 |             fRealRowWidth += size*cnt;
 | 
|---|
| 545 | 
 | 
|---|
| 546 |             fRealColumns.emplace_back(col, comp);
 | 
|---|
| 547 | 
 | 
|---|
| 548 |             SetStr("ZFORM"+std::to_string((long long int)(fRealColumns.size())), std::to_string((long long int)(cnt))+typechar, "format of "+name+" "+CommentFromType(typechar));
 | 
|---|
| 549 |             SetStr("ZCTYP"+std::to_string((long long int)(fRealColumns.size())), "FACT", "Compression type: FACT");
 | 
|---|
| 550 | 
 | 
|---|
| 551 |             return true;
 | 
|---|
| 552 |         }
 | 
|---|
| 553 | 
 | 
|---|
| 554 |         /// Get and set the actual number of threads for this object
 | 
|---|
| 555 |         int32_t GetNumThreads() const { return fNumQueues; }
 | 
|---|
| 556 |         bool SetNumThreads(uint32_t num)
 | 
|---|
| 557 |         {
 | 
|---|
| 558 |             if (is_open())
 | 
|---|
| 559 |             {
 | 
|---|
| 560 | #ifdef __EXCEPTIONS
 | 
|---|
| 561 |                 throw std::runtime_error("File must be closed before changing the number of compression threads");
 | 
|---|
| 562 | #else
 | 
|---|
| 563 |                 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads" << std::endl;
 | 
|---|
| 564 | #endif
 | 
|---|
| 565 |                 return false;
 | 
|---|
| 566 |             }
 | 
|---|
| 567 | 
 | 
|---|
| 568 |             //get number of physically available threads
 | 
|---|
| 569 | #ifdef HAVE_BOOST_THREAD
 | 
|---|
| 570 |             unsigned int num_available_cores = boost::thread::hardware_concurrency();
 | 
|---|
| 571 | #else
 | 
|---|
| 572 |             unsigned int num_available_cores = std::thread::hardware_concurrency();
 | 
|---|
| 573 | #endif
 | 
|---|
| 574 |             // could not detect number of available cores from system properties...
 | 
|---|
| 575 |             if (num_available_cores == 0)
 | 
|---|
| 576 |                 num_available_cores = 1;
 | 
|---|
| 577 | 
 | 
|---|
| 578 |             // leave one core for the main thread and one for the writing
 | 
|---|
| 579 |             if (num > num_available_cores)
 | 
|---|
| 580 |                 num = num_available_cores>2 ? num_available_cores-2 : 1;
 | 
|---|
| 581 | 
 | 
|---|
| 582 |             fCompressionQueues.resize(num<1?1:num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false));
 | 
|---|
| 583 |             fNumQueues = num;
 | 
|---|
| 584 | 
 | 
|---|
| 585 |             return true;
 | 
|---|
| 586 |         }
 | 
|---|
| 587 | 
 | 
|---|
| 588 |         uint32_t GetNumTiles() const { return fNumTiles; }
 | 
|---|
| 589 |         void SetNumTiles(uint32_t num) { fNumTiles=num; }
 | 
|---|
| 590 | 
 | 
|---|
| 591 | protected:
 | 
|---|
| 592 | 
 | 
|---|
| 593 |         /// Allocates the required objects.
 | 
|---|
| 594 |         void reallocateBuffers()
 | 
|---|
| 595 |         {
 | 
|---|
| 596 |             const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(FITS::BlockHeader) + sizeof(FITS::TileHeader) + 8; //+8 for checksuming;
 | 
|---|
| 597 |             fMemPool.setChunkSize(chunk_size);
 | 
|---|
| 598 | 
 | 
|---|
| 599 |             fSmartBuffer = fMemPool.malloc();
 | 
|---|
| 600 |             fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
 | 
|---|
| 601 |         }
 | 
|---|
| 602 | 
 | 
|---|
| 603 |         /// Actually does the writing to disk (and checksuming)
 | 
|---|
| 604 |         /// @param src the buffer to write
 | 
|---|
| 605 |         /// @param sizeToWrite how many bytes should be written
 | 
|---|
| 606 |         /// @return the state of the file
 | 
|---|
| 607 |         bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite)
 | 
|---|
| 608 |         {
 | 
|---|
| 609 |             char* checkSumPointer = src+4;
 | 
|---|
| 610 |             int32_t extraBytes = 0;
 | 
|---|
| 611 |             uint32_t sizeToChecksum = sizeToWrite;
 | 
|---|
| 612 | 
 | 
|---|
| 613 |             //should we extend the array to the left ?
 | 
|---|
| 614 |             if (fCheckOffset != 0)
 | 
|---|
| 615 |             {
 | 
|---|
| 616 |                 sizeToChecksum  += fCheckOffset;
 | 
|---|
| 617 |                 checkSumPointer -= fCheckOffset;
 | 
|---|
| 618 |                 memset(checkSumPointer, 0, fCheckOffset);
 | 
|---|
| 619 |             }
 | 
|---|
| 620 | 
 | 
|---|
| 621 |             //should we extend the array to the right ?
 | 
|---|
| 622 |             if (sizeToChecksum%4 != 0)
 | 
|---|
| 623 |             {
 | 
|---|
| 624 |                 extraBytes = 4 - (sizeToChecksum%4);
 | 
|---|
| 625 |                 memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
 | 
|---|
| 626 |                 sizeToChecksum += extraBytes;
 | 
|---|
| 627 |             }
 | 
|---|
| 628 | 
 | 
|---|
| 629 |             //do the checksum
 | 
|---|
| 630 |             fDataSum.add(checkSumPointer, sizeToChecksum);
 | 
|---|
| 631 | 
 | 
|---|
| 632 |             fCheckOffset = (4 - extraBytes)%4;
 | 
|---|
| 633 | 
 | 
|---|
| 634 |             //write data to disk
 | 
|---|
| 635 |             write(src+4, sizeToWrite);
 | 
|---|
| 636 | 
 | 
|---|
| 637 |             return good();
 | 
|---|
| 638 |         }
 | 
|---|
| 639 | 
 | 
|---|
| 640 |         /// Compress a given buffer based on the target. This is the method executed by the threads
 | 
|---|
| 641 |         /// @param target the struct hosting the parameters of the compression
 | 
|---|
| 642 |         /// @return number of bytes of the compressed data, or always 1 when used by the Queues
 | 
|---|
| 643 |         bool CompressBuffer(const CompressionTarget& target)
 | 
|---|
| 644 |         {
 | 
|---|
| 645 |             //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
 | 
|---|
| 646 |             //Really do not understand what's wrong...
 | 
|---|
| 647 |             //calibrate data if required
 | 
|---|
| 648 |             const uint32_t thisRoundNumRows  = (target.num_rows%fNumRowsPerTile) ? target.num_rows%fNumRowsPerTile : fNumRowsPerTile;
 | 
|---|
| 649 |             for (uint32_t i=0;i<thisRoundNumRows;i++)
 | 
|---|
| 650 |             {
 | 
|---|
| 651 |                 char* target_location = target.src.get() + fRealRowWidth*i;
 | 
|---|
| 652 |                 DrsOffsetCalibrate(target_location);
 | 
|---|
| 653 |             }
 | 
|---|
| 654 | #ifdef __EXCEPTIONS
 | 
|---|
| 655 |             try
 | 
|---|
| 656 |             {
 | 
|---|
| 657 | #endif
 | 
|---|
| 658 |                 //transpose the original data
 | 
|---|
| 659 |                 copyTransposeTile(target.src.get(), target.transposed_src.get(), target.num_rows);
 | 
|---|
| 660 | 
 | 
|---|
| 661 |                 //compress the buffer
 | 
|---|
| 662 |                 const uint64_t compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
 | 
|---|
| 663 | 
 | 
|---|
| 664 |                 //post the result to the writing queue
 | 
|---|
| 665 |                 //get a copy so that it becomes non-const
 | 
|---|
| 666 |                 fWriteToDiskQueue.emplace(target.target, compressed_size);
 | 
|---|
| 667 | 
 | 
|---|
| 668 | #ifdef __EXCEPTIONS
 | 
|---|
| 669 |             }
 | 
|---|
| 670 |             catch (...)
 | 
|---|
| 671 |             {
 | 
|---|
| 672 |                 fThreadsException = std::current_exception();
 | 
|---|
| 673 |                 if (fNumQueues == 0)
 | 
|---|
| 674 |                     std::rethrow_exception(fThreadsException);
 | 
|---|
| 675 |             }
 | 
|---|
| 676 | #endif
 | 
|---|
| 677 | 
 | 
|---|
| 678 |             return true;
 | 
|---|
| 679 |         }
 | 
|---|
| 680 | 
 | 
|---|
| 681 |         /// Write one compressed tile to disk. This is the method executed by the writing thread
 | 
|---|
| 682 |         /// @param target the struct hosting the write parameters
 | 
|---|
| 683 |         bool WriteBufferToDisk(const WriteTarget& target)
 | 
|---|
| 684 |         {
 | 
|---|
| 685 |             //is this the tile we're supposed to write ?
 | 
|---|
| 686 |             if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
 | 
|---|
| 687 |                 return false;
 | 
|---|
| 688 | 
 | 
|---|
| 689 |             fLatestWrittenTile++;
 | 
|---|
| 690 | 
 | 
|---|
| 691 | #ifdef __EXCEPTIONS
 | 
|---|
| 692 |             try
 | 
|---|
| 693 |             {
 | 
|---|
| 694 | #endif
 | 
|---|
| 695 |                 //could not write the data to disk
 | 
|---|
| 696 |                 if (!writeCompressedDataToDisk(target.data.get(), target.size))
 | 
|---|
| 697 |                     fErrno = errno;
 | 
|---|
| 698 | #ifdef __EXCEPTIONS
 | 
|---|
| 699 |             }
 | 
|---|
| 700 |             catch (...)
 | 
|---|
| 701 |             {
 | 
|---|
| 702 |                 fThreadsException = std::current_exception();
 | 
|---|
| 703 |                 if (fNumQueues == 0)
 | 
|---|
| 704 |                     std::rethrow_exception(fThreadsException);
 | 
|---|
| 705 |             }
 | 
|---|
| 706 | #endif
 | 
|---|
| 707 |             return true;
 | 
|---|
| 708 |         }
 | 
|---|
| 709 | 
 | 
|---|
| 710 |         /// Compress a given buffer based on its source and destination
 | 
|---|
| 711 |         //src cannot be const, as applySMOOTHING is done in place
 | 
|---|
| 712 |         /// @param dest the buffer hosting the compressed data
 | 
|---|
| 713 |         /// @param src the buffer hosting the transposed data
 | 
|---|
| 714 |         /// @param num_rows the number of uncompressed rows in the transposed buffer
 | 
|---|
| 715 |         /// @param the number of bytes of the compressed data
 | 
|---|
| 716 |         uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows, CatalogRow& catalog_row)
 | 
|---|
| 717 |         {
 | 
|---|
| 718 |             const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
 | 
|---|
| 719 |             uint32_t       offset           = 0;
 | 
|---|
| 720 | 
 | 
|---|
| 721 |             //skip the checksum reserved area
 | 
|---|
| 722 |             dest += 4;
 | 
|---|
| 723 | 
 | 
|---|
| 724 |             //skip the 'TILE' marker and tile size entry
 | 
|---|
| 725 |             uint64_t compressedOffset = sizeof(FITS::TileHeader);
 | 
|---|
| 726 | 
 | 
|---|
| 727 |             //now compress each column one by one by calling compression on arrays
 | 
|---|
| 728 |             for (uint32_t i=0;i<fRealColumns.size();i++)
 | 
|---|
| 729 |             {
 | 
|---|
| 730 |                 catalog_row[i].second = compressedOffset;
 | 
|---|
| 731 | 
 | 
|---|
| 732 |                 if (fRealColumns[i].col.num == 0)
 | 
|---|
| 733 |                     continue;
 | 
|---|
| 734 | 
 | 
|---|
| 735 |                 FITS::Compression& head = fRealColumns[i].block_head;
 | 
|---|
| 736 | 
 | 
|---|
| 737 |                 //set the default byte telling if uncompressed the compressed Flag
 | 
|---|
| 738 |                 const uint64_t previousOffset = compressedOffset;
 | 
|---|
| 739 | 
 | 
|---|
| 740 |                 //skip header data
 | 
|---|
| 741 |                 compressedOffset += head.getSizeOnDisk();
 | 
|---|
| 742 | 
 | 
|---|
| 743 |                 for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
 | 
|---|
| 744 |                 {
 | 
|---|
| 745 |                     switch (head.getProc(j))
 | 
|---|
| 746 |                     {
 | 
|---|
| 747 |                     case FITS::kFactRaw:
 | 
|---|
| 748 |                         compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src  + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
 | 
|---|
| 749 |                         break;
 | 
|---|
| 750 | 
 | 
|---|
| 751 |                     case FITS::kFactSmoothing:
 | 
|---|
| 752 |                         applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
 | 
|---|
| 753 |                         break;
 | 
|---|
| 754 | 
 | 
|---|
| 755 |                     case FITS::kFactHuffman16:
 | 
|---|
| 756 |                         if (head.getOrdering() == FITS::kOrderByCol)
 | 
|---|
| 757 |                             compressedOffset += compressHUFFMAN16(dest + compressedOffset, src  + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
 | 
|---|
| 758 |                         else
 | 
|---|
| 759 |                             compressedOffset += compressHUFFMAN16(dest + compressedOffset, src  + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
 | 
|---|
| 760 |                         break;
 | 
|---|
| 761 |                     }
 | 
|---|
| 762 |                 }
 | 
|---|
| 763 | 
 | 
|---|
| 764 |                 //check if compressed size is larger than uncompressed
 | 
|---|
| 765 |                 //if so set flag and redo it uncompressed
 | 
|---|
| 766 |                 if ((head.getProc(0) != FITS::kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
 | 
|---|
| 767 |                 {
 | 
|---|
| 768 |                     //de-smooth !
 | 
|---|
| 769 |                     if (head.getProc(0) == FITS::kFactSmoothing)
 | 
|---|
| 770 |                         UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
 | 
|---|
| 771 | 
 | 
|---|
| 772 |                     FITS::Compression he;
 | 
|---|
| 773 | 
 | 
|---|
| 774 |                     compressedOffset = previousOffset + he.getSizeOnDisk();
 | 
|---|
| 775 |                     compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
 | 
|---|
| 776 | 
 | 
|---|
| 777 |                     he.SetBlockSize(compressedOffset - previousOffset);
 | 
|---|
| 778 |                     he.Memcpy(dest+previousOffset);
 | 
|---|
| 779 | 
 | 
|---|
| 780 |                     offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
 | 
|---|
| 781 | 
 | 
|---|
| 782 |                     catalog_row[i].first = compressedOffset - catalog_row[i].second;
 | 
|---|
| 783 |                     continue;
 | 
|---|
| 784 |                 }
 | 
|---|
| 785 | 
 | 
|---|
| 786 |                 head.SetBlockSize(compressedOffset - previousOffset);
 | 
|---|
| 787 |                 head.Memcpy(dest + previousOffset);
 | 
|---|
| 788 | 
 | 
|---|
| 789 |                 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
 | 
|---|
| 790 |                 catalog_row[i].first = compressedOffset - catalog_row[i].second;
 | 
|---|
| 791 |             }
 | 
|---|
| 792 | 
 | 
|---|
| 793 |             const FITS::TileHeader tile_head(thisRoundNumRows, compressedOffset);
 | 
|---|
| 794 |             memcpy(dest, &tile_head, sizeof(FITS::TileHeader));
 | 
|---|
| 795 | 
 | 
|---|
| 796 |             return compressedOffset;
 | 
|---|
| 797 |         }
 | 
|---|
| 798 | 
 | 
|---|
| 799 |         /// Transpose a tile to a new buffer
 | 
|---|
| 800 |         /// @param src buffer hosting the regular, row-ordered data
 | 
|---|
| 801 |         /// @param dest the target buffer that will receive the transposed data
 | 
|---|
| 802 |         void copyTransposeTile(const char* src, char* dest, uint32_t num_rows)
 | 
|---|
| 803 |         {
 | 
|---|
| 804 |             const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
 | 
|---|
| 805 | 
 | 
|---|
| 806 |             //copy the tile and transpose it
 | 
|---|
| 807 |             for (uint32_t i=0;i<fRealColumns.size();i++)
 | 
|---|
| 808 |             {
 | 
|---|
| 809 |                 switch (fRealColumns[i].block_head.getOrdering())
 | 
|---|
| 810 |                 {
 | 
|---|
| 811 |                 case FITS::kOrderByRow:
 | 
|---|
| 812 |                     //regular, "semi-transposed" copy
 | 
|---|
| 813 |                     for (uint32_t k=0;k<thisRoundNumRows;k++)
 | 
|---|
| 814 |                     {
 | 
|---|
| 815 |                         memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
 | 
|---|
| 816 |                         dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
 | 
|---|
| 817 |                     }
 | 
|---|
| 818 |                     break;
 | 
|---|
| 819 | 
 | 
|---|
| 820 |                 case FITS::kOrderByCol:
 | 
|---|
| 821 |                     //transposed copy
 | 
|---|
| 822 |                     for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
 | 
|---|
| 823 |                         for (uint32_t k=0;k<thisRoundNumRows;k++)
 | 
|---|
| 824 |                         {
 | 
|---|
| 825 |                             memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
 | 
|---|
| 826 |                             dest += fRealColumns[i].col.size;
 | 
|---|
| 827 |                         }
 | 
|---|
| 828 |                     break;
 | 
|---|
| 829 |                 };
 | 
|---|
| 830 |             }
 | 
|---|
| 831 |         }
 | 
|---|
| 832 | 
 | 
|---|
| 833 |         /// Specific compression functions
 | 
|---|
| 834 |         /// @param dest the target buffer
 | 
|---|
| 835 |         /// @param src the source buffer
 | 
|---|
| 836 |         /// @param size number of bytes to copy
 | 
|---|
| 837 |         /// @return number of bytes written
 | 
|---|
| 838 |         uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
 | 
|---|
| 839 |         {
 | 
|---|
| 840 |             memcpy(dest, src, size);
 | 
|---|
| 841 |             return size;
 | 
|---|
| 842 |         }
 | 
|---|
| 843 | 
 | 
|---|
| 844 |         /// Do huffman encoding
 | 
|---|
| 845 |         /// @param dest the buffer that will receive the compressed data
 | 
|---|
| 846 |         /// @param src the buffer hosting the transposed data
 | 
|---|
| 847 |         /// @param numRows number of rows of data in the transposed buffer
 | 
|---|
| 848 |         /// @param sizeOfElems size in bytes of one data elements
 | 
|---|
| 849 |         /// @param numRowElems number of elements on each row
 | 
|---|
| 850 |         /// @return number of bytes written
 | 
|---|
| 851 |         uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
 | 
|---|
| 852 |         {
 | 
|---|
| 853 |             std::string huffmanOutput;
 | 
|---|
| 854 |             uint32_t previousHuffmanSize = 0;
 | 
|---|
| 855 | 
 | 
|---|
| 856 |             //if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
 | 
|---|
| 857 |             if (numRows < 2)
 | 
|---|
| 858 |                 return numRows*sizeOfElems*numRowElems + 1000;
 | 
|---|
| 859 | 
 | 
|---|
| 860 |             if (sizeOfElems < 2 )
 | 
|---|
| 861 |             {
 | 
|---|
| 862 | #ifdef __EXCEPTIONS
 | 
|---|
| 863 |                 throw std::runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
 | 
|---|
| 864 | #else
 | 
|---|
| 865 |                 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << std::endl;
 | 
|---|
| 866 |                 return 0;
 | 
|---|
| 867 | #endif
 | 
|---|
| 868 |             }
 | 
|---|
| 869 | 
 | 
|---|
| 870 |             uint32_t huffmanOffset = 0;
 | 
|---|
| 871 |             for (uint32_t j=0;j<numRowElems;j++)
 | 
|---|
| 872 |             {
 | 
|---|
| 873 |                 Huffman::Encode(huffmanOutput,
 | 
|---|
| 874 |                                 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
 | 
|---|
| 875 |                                 numRows*(sizeOfElems/2));
 | 
|---|
| 876 |                 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
 | 
|---|
| 877 |                 huffmanOffset += sizeof(uint32_t);
 | 
|---|
| 878 |                 previousHuffmanSize = huffmanOutput.size();
 | 
|---|
| 879 |             }
 | 
|---|
| 880 | 
 | 
|---|
| 881 |             const size_t totalSize = huffmanOutput.size() + huffmanOffset;
 | 
|---|
| 882 | 
 | 
|---|
| 883 |             //only copy if not larger than not-compressed size
 | 
|---|
| 884 |             if (totalSize < numRows*sizeOfElems*numRowElems)
 | 
|---|
| 885 |                 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
 | 
|---|
| 886 | 
 | 
|---|
| 887 |             return totalSize;
 | 
|---|
| 888 |         }
 | 
|---|
| 889 | 
 | 
|---|
| 890 |         /// Applies Thomas' DRS4 smoothing
 | 
|---|
| 891 |         /// @param data where to apply it
 | 
|---|
| 892 |         /// @param numElems how many elements of type int16_t are stored in the buffer
 | 
|---|
| 893 |         /// @return number of bytes modified
 | 
|---|
| 894 |         uint32_t applySMOOTHING(char* data, uint32_t numElems)
 | 
|---|
| 895 |         {
 | 
|---|
| 896 |             int16_t* short_data = reinterpret_cast<int16_t*>(data);
 | 
|---|
| 897 |             for (int j=numElems-1;j>1;j--)
 | 
|---|
| 898 |                 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
 | 
|---|
| 899 | 
 | 
|---|
| 900 |             return numElems*sizeof(int16_t);
 | 
|---|
| 901 |         }
 | 
|---|
| 902 | 
 | 
|---|
| 903 |         /// Apply the inverse transform of the integer smoothing
 | 
|---|
| 904 |         /// @param data where to apply it
 | 
|---|
| 905 |         /// @param numElems how many elements of type int16_t are stored in the buffer
 | 
|---|
| 906 |         /// @return number of bytes modified
 | 
|---|
| 907 |         uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
 | 
|---|
| 908 |         {
 | 
|---|
| 909 |             int16_t* short_data = reinterpret_cast<int16_t*>(data);
 | 
|---|
| 910 |             for (uint32_t j=2;j<numElems;j++)
 | 
|---|
| 911 |                 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
 | 
|---|
| 912 | 
 | 
|---|
| 913 |             return numElems*sizeof(uint16_t);
 | 
|---|
| 914 |         }
 | 
|---|
| 915 | 
 | 
|---|
| 916 | 
 | 
|---|
| 917 | 
 | 
|---|
| 918 |         //thread related stuff
 | 
|---|
| 919 |         MemoryManager   fMemPool;           ///< Actual memory manager, providing memory for the compression buffers
 | 
|---|
| 920 |         int32_t         fNumQueues;         ///< Current number of threads that will be used by this object
 | 
|---|
| 921 |         uint64_t        fMaxUsableMem;      ///< Maximum number of bytes that can be allocated by the memory manager
 | 
|---|
| 922 |         int32_t         fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads)
 | 
|---|
| 923 | 
 | 
|---|
| 924 |         std::vector<Queue<CompressionTarget>>     fCompressionQueues;  ///< Processing queues (=threads)
 | 
|---|
| 925 |         Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue;   ///< Writing queue (=thread)
 | 
|---|
| 926 | 
 | 
|---|
| 927 |         // catalog related stuff
 | 
|---|
| 928 |         CatalogType fCatalog;               ///< Catalog for this file
 | 
|---|
| 929 |         uint32_t    fCatalogSize;           ///< Actual catalog size (.size() is slow on large lists)
 | 
|---|
| 930 |         uint32_t    fNumTiles;              ///< Number of pre-reserved tiles
 | 
|---|
| 931 |         uint32_t    fNumRowsPerTile;        ///< Number of rows per tile
 | 
|---|
| 932 |         off_t       fCatalogOffset;         ///< Offset of the catalog from the beginning of the file
 | 
|---|
| 933 | 
 | 
|---|
| 934 |         // checksum related stuff
 | 
|---|
| 935 |         Checksum fCatalogSum;    ///< Checksum of the catalog
 | 
|---|
| 936 |         Checksum fRawSum;        ///< Raw sum (specific to FACT)
 | 
|---|
| 937 |         int32_t  fCheckOffset;   ///< offset to the data pointer to calculate the checksum
 | 
|---|
| 938 | 
 | 
|---|
| 939 |         // data layout related stuff
 | 
|---|
| 940 |         /// Regular columns augmented with compression informations
 | 
|---|
| 941 |         struct CompressedColumn
 | 
|---|
| 942 |         {
 | 
|---|
| 943 |             CompressedColumn(const Table::Column& c, const FITS::Compression& h) : col(c),
 | 
|---|
| 944 |                 block_head(h)
 | 
|---|
| 945 |             {}
 | 
|---|
| 946 |             Table::Column col;             ///< the regular column entry
 | 
|---|
| 947 |             FITS::Compression block_head;  ///< the compression data associated with that column
 | 
|---|
| 948 |         };
 | 
|---|
| 949 |         std::vector<CompressedColumn> fRealColumns;     ///< Vector hosting the columns of the file
 | 
|---|
| 950 |         uint32_t                      fRealRowWidth;    ///< Width in bytes of one uncompressed row
 | 
|---|
| 951 |         std::shared_ptr<char>         fSmartBuffer;     ///< Smart pointer to the buffer where the incoming rows are written
 | 
|---|
| 952 |         std::vector<char>             fRawSumBuffer;    ///< buffer used for checksuming the incoming data, before compression
 | 
|---|
| 953 | 
 | 
|---|
| 954 | #ifdef __EXCEPTIONS
 | 
|---|
| 955 |         std::exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads
 | 
|---|
| 956 | #endif
 | 
|---|
| 957 |         int                fErrno;            ///< propagate errno to main thread
 | 
|---|
| 958 | };
 | 
|---|
| 959 | 
 | 
|---|
| 960 | #endif
 | 
|---|