Index: /trunk/Mars/mcore/zofits.h
===================================================================
--- /trunk/Mars/mcore/zofits.h	(revision 17268)
+++ /trunk/Mars/mcore/zofits.h	(revision 17269)
@@ -46,5 +46,5 @@
         struct WriteTarget
         {
-            bool operator < (const WriteTarget& other)
+            bool operator < (const WriteTarget& other) const
             {
                 return tile_num < other.tile_num;
@@ -141,4 +141,7 @@
             ofits::WriteTableHeader(name);
 
+            fCompressionQueues.front().setPromptExecution(fNumQueues==0);
+            fWriteToDiskQueue.setPromptExecution(fNumQueues==0);
+
             if (fNumQueues != 0)
             {
@@ -153,4 +156,7 @@
             //mark that no tile has been written so far
             fLatestWrittenTile = -1;
+
+            //no wiring error (in the writing of the data) has occured so far
+            fErrno = 0;
 
             return good();
@@ -316,36 +322,7 @@
             }
 
-            CompressionTarget compress_target(AddOneCatalogRow());
-            SetNextCompression(compress_target);
-
-            //no worker threads. do everything in-line
-            if (fNumQueues == 0)
-            {
-                const uint64_t size_to_write = CompressBuffer(compress_target);
-
-                const WriteTarget write_target(compress_target.target, size_to_write);
-                if (!WriteBufferToDisk(write_target))
-                    throw std::runtime_error("Unexpected tile number mismatch in WriteBufferToDisk in the main thread.");
-
-                // The correct 'errno' is set, because it is the main thread.
-                return good();
-            }
-
-            //if all queues are empty, use queue 0
-            uint32_t min_index     = 0;
-            uint32_t min_size      = std::numeric_limits<uint32_t>::max();
-            uint32_t current_index = 0;
-
-            for (auto it=fCompressionQueues.cbegin(); it!=fCompressionQueues.cend(); it++)
-            {
-                if (it->size() < min_size)
-                {
-                    min_index = current_index;
-                    min_size = it->size();
-                }
-                current_index++;
-            }
-
-            if (!fCompressionQueues[min_index].emplace(compress_target))
+            // use the least occupied queue
+            const auto imin = std::min_element(fCompressionQueues.begin(), fCompressionQueues.end());
+            if (!imin->emplace(InitNextCompression()))
                 throw std::runtime_error("The compression queues are not started. Did you close the file before writing this row?");
 
@@ -364,6 +341,8 @@
         /// Setup the environment to compress yet another tile of data
         /// @param target the struct where to host the produced parameters
-        void SetNextCompression(CompressionTarget& target)
-        {
+        CompressionTarget InitNextCompression()
+        {
+            CompressionTarget target(AddOneCatalogRow());
+
             //fill up compression target
             target.src            = fSmartBuffer;
@@ -379,4 +358,6 @@
             //get a new buffer to host the incoming data
             fSmartBuffer = fMemPool.malloc();
+
+            return target;
         }
 
@@ -389,40 +370,44 @@
 
             //did we write more rows than what the catalog could host ?
-            if (fCatalogSize > fNumTiles)
-            {
-                const uint32_t shrink_factor = fCatalogSize / fNumTiles; //always exact as extra rows were added just above
-
-                //shrink the catalog !
-                uint32_t entry_id = 1;
-                auto it = fCatalog.begin();
-                it++;
-                for (; it != fCatalog.end(); it++)
-                {
-                    if (entry_id >= fNumTiles) break;
-
-                    uint32_t target_id = entry_id*shrink_factor;
-
-                    auto jt = it;
-                    for (uint32_t i=0;i<target_id-entry_id;i++)
-                        jt++;
-
-                    *it = *jt;
-
-                    entry_id++;
-                }
-
-                const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
-                //remove the too many entries
-                for (uint32_t i=0;i<num_tiles_to_remove;i++)
-                {
-                    fCatalog.pop_back();
-                    fCatalogSize--;
-                }
-                //update header keywords
-                fNumRowsPerTile *= shrink_factor;
-
-                SetInt("ZTILELEN", fNumRowsPerTile);
-                SetInt("ZSHRINK",  shrink_factor);
-            }
+            if (fCatalogSize <= fNumTiles) // nothing to do
+                return;
+
+            //always exact as extra rows were added just above
+            const uint32_t shrink_factor = fCatalogSize / fNumTiles; 
+
+            //shrink the catalog !
+            uint32_t entry_id = 1;
+            auto it = fCatalog.begin();
+            it++;
+            for (; it != fCatalog.end(); it++)
+            {
+                if (entry_id >= fNumTiles)
+                    break;
+
+                const uint32_t target_id = entry_id*shrink_factor;
+
+                auto jt = it;
+                for (uint32_t i=0; i<target_id-entry_id; i++)
+                    jt++;
+
+                *it = *jt;
+
+                entry_id++;
+            }
+
+            const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
+
+            //remove the too many entries
+            for (uint32_t i=0;i<num_tiles_to_remove;i++)
+            {
+                fCatalog.pop_back();
+                fCatalogSize--;
+            }
+
+            //update header keywords
+            fNumRowsPerTile *= shrink_factor;
+
+            SetInt("ZTILELEN", fNumRowsPerTile);
+            SetInt("ZSHRINK",  shrink_factor);
         }
 
@@ -447,20 +432,10 @@
 #endif
 
-            //write the last tile of data (if any
-            if (fTable.num_rows%fNumRowsPerTile != 0)
-            {
-                CompressionTarget compress_target(AddOneCatalogRow());
-                SetNextCompression(compress_target);
-
-                //set number of threads to zero before calling compressBuffer
-                const int32_t backup_num_queues = fNumQueues;
-                fNumQueues = 0;
-
-                const uint64_t size_to_write = CompressBuffer(compress_target);
-                fNumQueues = backup_num_queues;
-
-                const WriteTarget write_target(compress_target.target, size_to_write);
-                if (!WriteBufferToDisk(write_target))
-                    throw std::runtime_error("Tile number mismatch in WriteBufferToDisk writing the last tile.");
+            //write the last tile of data (if any)
+            if (fErrno==0 && fTable.num_rows%fNumRowsPerTile!=0)
+            {
+                fWriteToDiskQueue.enablePromptExecution();
+                fCompressionQueues.front().enablePromptExecution();
+                fCompressionQueues.front().emplace(InitNextCompression());
             }
 
@@ -563,5 +538,5 @@
 
         /// Get and set the actual number of threads for this object
-        int32_t GetNumThreads() const { return fNumQueues;}
+        int32_t GetNumThreads() const { return fNumQueues; }
         bool SetNumThreads(uint32_t num)
         {
@@ -590,9 +565,6 @@
                 num = num_available_cores>2 ? num_available_cores-2 : 1;
 
-            if (fCompressionQueues.size() != uint32_t(num))
-            {
-                fCompressionQueues.resize(num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false));
-                fNumQueues = num;
-            }
+            fCompressionQueues.resize(num<1?1:num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false));
+            fNumQueues = num;
 
             return true;
@@ -651,8 +623,6 @@
         /// @param target the struct hosting the parameters of the compression
         /// @return number of bytes of the compressed data, or always 1 when used by the Queues
-        uint32_t CompressBuffer(const CompressionTarget& target)
-        {
-            uint64_t compressed_size = 0;
-
+        bool CompressBuffer(const CompressionTarget& target)
+        {
             //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
             //Really do not understand what's wrong...
@@ -674,5 +644,10 @@
 
                 //compress the buffer
-                compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
+                const uint64_t compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
+
+                //post the result to the writing queue
+                //get a copy so that it becomes non-const
+                fWriteToDiskQueue.emplace(target.target, compressed_size);
+
 #ifdef __EXCEPTIONS
             }
@@ -685,13 +660,5 @@
 #endif
 
-            if (fNumQueues == 0)
-                return compressed_size;
-
-            //post the result to the writing queue
-            //get a copy so that it becomes non-const
-            fWriteToDiskQueue.emplace(target.target, compressed_size);
-
-            // if used by the queue, always return true as the elements are not ordered
-            return 1;
+            return true;
         }
 
