Changeset 16769 for trunk/FACT++/src


Ignore:
Timestamp:
06/08/13 12:22:15 (11 years ago)
Author:
tbretz
Message:
There is no need to make the scheduling of the async write asynchronously, so I schedule the async write now directly; this also removed the send queue, which was primarily introduced to make the tranmission to the FTM serial, but this is not needed anymore; to keep track of the send queue size and for debugging, a counter for the messages in the send buffer has been introduced
Location:
trunk/FACT++/src
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/Connection.cc

    r14577 r16769  
    103103
    104104    // Reset the connection status
     105    fQueueSize = 0;
    105106    fConnectionStatus = kDisconnected;
    106107
     
    108109    fInTimeout.cancel();
    109110    fOutTimeout.cancel();
    110 
    111     // Empty output queue
    112     fOutQueue.clear();
    113111
    114112    if (!restart || IsConnecting())
     
    168166void Connection::HandleSentData(const bs::error_code& error, size_t n)
    169167{
     168    if (error==ba::error::basic_errors::operation_aborted)
     169        return;
     170
    170171    if (error && error != ba::error::not_connected)
    171172    {
     
    183184        msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
    184185        Warn(msg);
    185     }
    186     else
    187     {
    188         if (fDebugTx)
    189         {
    190             ostringstream msg;
    191             msg << n << " bytes successfully sent to " << URL();
    192             Debug(msg);
    193         }
    194     }
    195 
    196     // This is "thread" safe because SendMessage and HandleSentMessage
    197     // are serialized in the EventQueue. Note: Do not call these
    198     // functions directly from any other place then Handlers, use
    199     // PostMessage instead
    200     if (!fOutQueue.empty())
    201         fOutQueue.pop_front();
    202 
    203     if (fOutQueue.empty())
    204     {
    205         // Queue went empty, remove deadline
     186
     187        return;
     188    }
     189
     190    if (--fQueueSize==0)
    206191        fOutTimeout.cancel();
    207         return;
    208     }
    209 
    210     // AsyncWrite + Deadline
    211     AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &Connection::HandleSentData*/);
     192
     193    if (fDebugTx)
     194    {
     195        ostringstream msg;
     196        msg << n << " bytes successfully sent to " << URL();
     197        Debug(msg);
     198    }
     199}
     200
     201void Connection::PostMessage(const void *ptr, size_t sz)
     202{
     203    // This function can be called from a different thread...
     204    if (!is_open())
     205        return;
     206
     207    // ... this is why we have to increase fQueueSize first
     208    fQueueSize++;
     209
     210    // ... and shift the deadline timer
     211    // This is not ideal, because if we are continously
     212    // filling the buffer, it will never timeout
    212213    AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
    213 }
    214 
    215 // It is important that when SendMessageImp is called, or to be more
    216 // precise boost::bind is called, teh data is copied!
    217 void Connection::SendMessageImp(const vector<char> msg)
    218 {
    219     /*
    220     if (!fConnectionEstablished)
    221     {
    222         UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
    223         return;
    224     }*/
    225 
    226     const bool first_message_in_queue = fOutQueue.empty();
    227 
    228     // This is "thread" safe because SendMessage and HandleSentMessage
    229     // are serialized in the EventQueue. Note: Do not call these
    230     // functions directly from any other place then Handlers, use
    231     // PostMessage instead
    232     fOutQueue.push_back(msg);
    233 
    234     if (!first_message_in_queue)
    235         return;
    236 
    237     // AsyncWrite + Deadline
    238     AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &Connection::HandleSentData*/);
    239     AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
    240 }
    241 
    242 void Connection::PostMessage(const void *ptr, size_t max)
    243 {
    244     const vector<char> msg(reinterpret_cast<const char*>(ptr),
    245                            reinterpret_cast<const char*>(ptr)+max);
    246 
    247     get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
     214
     215    // Now we can schedule the buffer to be sent
     216    AsyncWrite(ba::const_buffers_1(ptr, sz));
     217
     218    // If a socket is closed, all pending asynchronous
     219    // operation will be aborted.
    248220}
    249221
     
    253225        max = cmd.length()+1;
    254226
    255     vector <char>msg(max);
    256 
    257     copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
    258 
    259     PostMessage(msg);
     227    PostMessage(cmd.c_str(), min(cmd.length()+1, max));
    260228}
    261229
     
    306274            Info("Connection established to "+host+"...");
    307275
     276        fQueueSize = 0;
    308277        fConnectionStatus = kConnected;
    309278
     
    487456fLog(0), fVerbose(true), fDebugTx(false),
    488457fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
    489 fConnectionStatus(kDisconnected)
    490 {
    491 }
     458fQueueSize(0), fConnectionStatus(kDisconnected)
     459{
     460}
  • trunk/FACT++/src/Connection.h

    r16090 r16769  
    3939    boost::asio::deadline_timer   fOutTimeout;
    4040    boost::asio::deadline_timer   fConnectionTimer;
    41     std::list<std::vector<char>> fOutQueue;
     41
     42    size_t fQueueSize;
    4243
    4344    ConnectionStatus_t fConnectionStatus;
     
    116117
    117118    // ------------------------ write --------------------------
    118     void SendMessageImp(const std::vector<char> msg);
    119119    void PostMessage(const void *msg, size_t s=0);
    120120    void PostMessage(const std::string &cmd, size_t s=-1);
     
    137137    virtual void HandleReadTimeout(const boost::system::error_code&) { }
    138138
    139     bool IsTxQueueEmpty() const { return fOutQueue.empty(); }
     139    bool IsTxQueueEmpty() const { return fQueueSize==0; /*fOutQueue.empty();*/ }
    140140
    141141    int IsClosed() const { return !is_open(); }
Note: See TracChangeset for help on using the changeset viewer.