Changeset 17929 for trunk


Ignore:
Timestamp:
07/23/14 12:00:00 (10 years ago)
Author:
tbretz
Message:
It seems that the SQM does not allow asynchronous connections (requests and answers at the same time) very much like the bias crate. Therefore changed the logic of the requests to a synchronous approach.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/sqmctrl.cc

    r17918 r17929  
    2929
    3030private:
    31     bool fIsVerbose;
    32     bool fMsgReceived;
    33     bool fMsgValid;
    34 
     31    bool     fIsVerbose;
    3532    uint16_t fTimeout;
    3633
    3734    boost::asio::streambuf fBuffer;
     35
     36    boost::asio::deadline_timer fTrigger;
    3837
    3938    void HandleRead(const boost::system::error_code& err, size_t bytes_received)
     
    6463        if (!getline(is, buffer, '\n'))
    6564        {
    66             Error("Received message does not contain \\n... closing connection.");
     65            Fatal("Received message does not contain \\n... closing connection.");
    6766            PostClose(false);
    6867            return;
     
    7271
    7372        if (fIsVerbose)
    74             Out() << buffer << endl;
     73            Out() << Time().GetAsStr("%H:%M:%S.%f") << "[" << buffer.size() << "]: " << buffer << endl;
    7574
    7675        vector<string> vec;
    7776        boost::split(vec, buffer, boost::is_any_of(","));
    7877
     78        // Send next request in fTimeout milliseconds
     79        fTrigger.expires_from_now(boost::posix_time::milliseconds(fTimeout));
     80        fTrigger.async_wait(boost::bind(&ConnectionSQM::HandleRequestTrigger,
     81                                          this, dummy::error));
    7982        try
    8083        {
     
    8386
    8487            if (vec[0]!="r")
    85                 throw runtime_error("Not a 'reading request' answer.");
     88                throw runtime_error("Not a 'reading request' answer: "+vec[0]);
    8689
    8790            SQM::Data data;
     
    97100        catch (const exception &e)
    98101        {
    99             fMsgValid = false;
    100102            Error("Parsing received data failed ["+string(e.what())+"]");
    101103            Error("Received: "+buffer);
    102             PostClose(false);
    103             return;
    104         }
    105 
    106         fMsgValid    = true;
    107         fMsgReceived = true;
     104            PostClose(true);
     105        }
    108106    }
    109107
     
    114112        {
    115113            ostringstream str;
    116             str << "Read timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
     114            str << "ReadTimeout of " << URL() << " failed: " << error.message() << " (" << error << ")";// << endl;
    117115            Error(str);
    118116
     
    125123            // For example: Here we could schedule a new accept if we
    126124            // would not want to allow two connections at the same time.
    127             PostClose(true);
     125            //PostClose(true);
    128126            return;
    129127        }
     
    136134            return;
    137135
    138         if (fMsgReceived)
    139         {
    140             StartReadReport();
    141             return;
    142         }
    143 
    144136        ostringstream str;
    145         str << "No answer received from " << URL() << ".";
     137        str << "No valid answer received from " << URL() << " within " << ceil(fTimeout*1.5) << "ms";
    146138        Error(str);
    147139
     
    149141    }
    150142
     143    void HandleRequestTrigger(const bs::error_code &error)
     144    {
     145        // 125: Operation canceled (bs::error_code(125, bs::system_category))
     146        if (error && error!=ba::error::basic_errors::operation_aborted)
     147        {
     148            ostringstream str;
     149            str << "RequestTrigger failed of " << URL() << " failed: " << error.message() << " (" << error << ")";// << endl;
     150            Error(str);
     151
     152            PostClose(true);
     153            return;
     154        }
     155
     156        if (!is_open())
     157        {
     158            // For example: Here we could schedule a new accept if we
     159            // would not want to allow two connections at the same time.
     160            //PostClose(true);
     161            return;
     162        }
     163
     164        // Check whether the deadline has passed. We compare the deadline
     165        // against the current time since a new asynchronous operation
     166        // may have moved the deadline before this actor had a chance
     167        // to run.
     168        if (fTrigger.expires_at() > ba::deadline_timer::traits_type::now())
     169            return;
     170
     171        StartReadReport();
     172    }
     173
    151174    void StartReadReport()
    152175    {
    153         fMsgReceived = false;
    154 
    155176        PostMessage(string("rx\n"), 3);
     177
    156178        async_read_until(*this, fBuffer, '\n',
    157179                         boost::bind(&ConnectionSQM::HandleRead, this,
    158180                                     dummy::error, dummy::bytes_transferred));
    159181
    160         fInTimeout.expires_from_now(boost::posix_time::milliseconds(fTimeout));
     182        fInTimeout.expires_from_now(boost::posix_time::milliseconds(fTimeout*2.5));
    161183        fInTimeout.async_wait(boost::bind(&ConnectionSQM::HandleReadTimeout,
    162184                                          this, dummy::error));
     
    175197public:
    176198    ConnectionSQM(ba::io_service& ioservice, MessageImp &imp) : Connection(ioservice, imp()),
    177         fIsVerbose(true)
     199        fIsVerbose(true), fTimeout(0), fTrigger(ioservice)
    178200    {
    179201        SetLogStream(&imp);
     
    193215    int GetState() const
    194216    {
    195         if (!is_open())
    196             return SQM::State::kDisconnected;
    197 
    198         return fMsgValid ? SQM::State::kValid : SQM::State::kConnected;
     217        return is_open() ? SQM::State::kConnected : SQM::State::kDisconnected;
    199218    }
    200219};
Note: See TracChangeset for help on using the changeset viewer.