Changeset 17932


Ignore:
Timestamp:
07/23/14 15:39:06 (10 years ago)
Author:
tbretz
Message:
Revived the valid state; make sure the first message (which might still be junk from a non empty buffer) is properly treated; make sure that an automatic re-connect properly works
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/sqmctrl.cc

    r17931 r17932  
    3030private:
    3131    bool     fIsVerbose;
     32    bool     fFirstMessage;
     33    bool     fValid;
    3234    uint16_t fTimeout;
    3335
     
    5456                Error(str);
    5557            }
    56             PostClose(err!=ba::error::basic_errors::operation_aborted);
     58            PostClose(false);//err!=ba::error::basic_errors::operation_aborted);
    5759            return;
    5860        }
     
    7173
    7274        if (fIsVerbose)
    73             Out() << Time().GetAsStr("%H:%M:%S.%f") << "[" << buffer.size() << "]: " << buffer << endl;
     75        {
     76            Out() << Time().GetAsStr("%H:%M:%S.%f") << "[" << buffer.size() << "]: " << buffer << "|" << endl;
     77            // Out() << Time().GetAsStr("%H:%M:%S.%f") << "[ " << vec.size() << "]: ";
     78            // for (auto it=vec.begin(); it!=vec.end(); it++)
     79            //     Out() << *it << "|";
     80            // Out() << endl;
     81        }
    7482
    7583        vector<string> vec;
    7684        boost::split(vec, buffer, boost::is_any_of(","));
    7785
    78         // Send next request in fTimeout milliseconds calculated from
    79         // the last request onwards.
    80         fTrigger.expires_at(fTrigger.expires_at()+boost::posix_time::milliseconds(fTimeout));
    81         fTrigger.async_wait(boost::bind(&ConnectionSQM::HandleRequestTrigger,
    82                                           this, dummy::error));
    8386        try
    8487        {
     
    8790
    8891            if (vec[0]!="r")
    89                 throw runtime_error("Not a 'reading request' answer: "+vec[0]);
     92                throw runtime_error("Not a proper answer["+to_string(vec[0].size())+"]: "+vec[0]);
    9093
    9194            SQM::Data data;
     
    98101
    99102            Update(data);
     103
     104            fValid = true;
    100105        }
    101106        catch (const exception &e)
    102107        {
    103             Error("Parsing received data failed ["+string(e.what())+"]");
    104             Error("Received: "+buffer);
    105             PostClose(true);
    106         }
     108            if (fFirstMessage)
     109                Warn("Parsing first message failed ["+string(e.what())+"]");
     110            else
     111            {
     112                Error("Parsing received message failed ["+string(e.what())+"]");
     113                Error("Received: "+buffer);
     114                PostClose(false);
     115                return;
     116            }
     117        }
     118
     119        // Send next request in fTimeout milliseconds calculated from
     120        // the last request onwards.
     121        fTrigger.expires_at(fTrigger.expires_at()+boost::posix_time::milliseconds(fTimeout));
     122        fTrigger.async_wait(boost::bind(&ConnectionSQM::HandleRequestTrigger,
     123                                        this, dummy::error));
     124
     125        fFirstMessage = false;
    107126    }
    108127
     
    116135            Error(str);
    117136
    118             PostClose(true);
     137            PostClose(false);
    119138            return;
    120139        }
     
    124143            // For example: Here we could schedule a new accept if we
    125144            // would not want to allow two connections at the same time.
    126             //PostClose(true);
    127             return;
    128         }
     145            PostClose(true);
     146            return;
     147        }
     148
     149        // This is called if the deadline has been shifted
     150        if (error==ba::error::basic_errors::operation_aborted)
     151            return;
    129152
    130153        // Check whether the deadline has passed. We compare the deadline
     
    139162        Error(str);
    140163
    141         PostClose(true);
     164        PostClose(false);
     165
     166        fInTimeout.expires_from_now(boost::posix_time::milliseconds(1000));
     167        fInTimeout.async_wait(boost::bind(&ConnectionSQM::HandleReadTimeout,
     168                                          this, dummy::error));
    142169    }
    143170
    144171    void HandleRequestTrigger(const bs::error_code &error)
    145172    {
     173
    146174        // 125: Operation canceled (bs::error_code(125, bs::system_category))
    147175        if (error && error!=ba::error::basic_errors::operation_aborted)
     
    151179            Error(str);
    152180
    153             PostClose(true);
     181            PostClose(false);
    154182            return;
    155183        }
     
    177205        PostMessage(string("rx\n"), 3);
    178206
     207        // Do not schedule two reads
     208        if (!fFirstMessage)
     209        {
     210            async_read_until(*this, fBuffer, '\n',
     211                             boost::bind(&ConnectionSQM::HandleRead, this,
     212                                         dummy::error, dummy::bytes_transferred));
     213        }
     214
     215        fInTimeout.expires_from_now(boost::posix_time::milliseconds(fTimeout*1.5));
     216        fInTimeout.async_wait(boost::bind(&ConnectionSQM::HandleReadTimeout,
     217                                          this, dummy::error));
     218    }
     219
     220private:
     221    // This is called when a connection was established
     222    void ConnectionEstablished()
     223    {
     224        fValid = false;
     225        fFirstMessage = true;
     226
     227        // Empty a possible buffer first before we start reading
     228        // otherwise reading and writing might not be consecutive
    179229        async_read_until(*this, fBuffer, '\n',
    180230                         boost::bind(&ConnectionSQM::HandleRead, this,
    181231                                     dummy::error, dummy::bytes_transferred));
    182232
    183         fInTimeout.expires_from_now(boost::posix_time::milliseconds(fTimeout*1.5));
    184         fInTimeout.async_wait(boost::bind(&ConnectionSQM::HandleReadTimeout,
    185                                           this, dummy::error));
    186     }
    187 
    188 private:
    189     // This is called when a connection was established
    190     void ConnectionEstablished()
    191     {
    192         StartReadReport();
    193         fTrigger.expires_at(Time());
     233        // If there was no immediate answer, send a request
     234        fTrigger.expires_at(Time()+boost::posix_time::milliseconds(1000));
     235        fTrigger.async_wait(boost::bind(&ConnectionSQM::HandleRequestTrigger,
     236                                        this, dummy::error));
    194237    }
    195238
     
    217260    int GetState() const
    218261    {
    219         return is_open() ? SQM::State::kConnected : SQM::State::kDisconnected;
     262        if (!is_open())
     263            return  SQM::State::kDisconnected;
     264
     265        return fValid ? SQM::State::kValid : SQM::State::kConnected;
    220266    }
    221267};
Note: See TracChangeset for help on using the changeset viewer.