Changeset 16768


Ignore:
Timestamp:
06/08/13 12:21:51 (11 years ago)
Author:
tbretz
Message:
There is no need to make the scheduling of the async write asynchronously, so I schedule the async write now directly; this also removed the send queue, which was primarily introduced to make the tranmission to the FTM serial, but this is not needed anymore; to keep track of the send queue size and for debugging, a counter for the messages in the send buffer has been introduced
Location:
trunk/FACT++/src
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/ConnectionUSB.cc

    r16566 r16768  
    103103
    104104    // Reset the connection status
     105    fQueueSize = 0;
    105106    fConnectionStatus = kDisconnected;
    106 
    107     // Empty output queue
    108     fOutQueue.clear();
    109107
    110108#ifdef DEBUG
     
    198196        return;
    199197
    200     Error("fOutTimeout has expired, writing data to "+URL()+" ["+to_string(fOutQueue.size())+"]");
     198    Error("fOutTimeout has expired, writing data to "+URL());
    201199
    202200    CloseImp(-1);
     
    205203void ConnectionUSB::HandleSentData(const bs::error_code& error, size_t n)
    206204{
     205    if (error==ba::error::basic_errors::operation_aborted)
     206        return;
     207
    207208    if (error && error != ba::error::not_connected)
    208209    {
     
    220221        msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
    221222        Warn(msg);
    222     }
    223     else
    224     {
     223        return;
     224    }
     225
     226    if (--fQueueSize==0)
     227        fOutTimeout.cancel();
     228
    225229#ifdef DEBUG_TX
    226         ostringstream msg;
    227         msg << n << " bytes successfully sent to " << URL();
    228         Message(msg);
     230    ostringstream msg;
     231    msg << n << " bytes successfully sent to " << URL();
     232    Message(msg);
    229233#endif
    230     }
    231234
    232235#ifdef DEBUG
     
    239242
    240243    HandleTransmittedData(n);
    241 
    242     // This is "thread" safe because SendMessage and HandleSentMessage
    243     // are serialized in the EventQueue. Note: Do not call these
    244     // functions directly from any other place then Handlers, use
    245     // PostMessage instead
    246     if (!fOutQueue.empty())
    247         fOutQueue.pop_front();
    248 
    249     if (fOutQueue.empty())
    250     {
    251         // Queue went empty, remove deadline
    252         fOutTimeout.cancel();
    253         return;
    254     }
    255 
    256      // AsyncWrite + Deadline
    257     AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &ConnectionUSB::HandleSentData*/);
     244}
     245
     246void ConnectionUSB::PostMessage(const void *ptr, size_t sz)
     247{
     248    // This function can be called from a different thread...
     249    if (!is_open())
     250        return;
     251
     252    // ... this is why we have to increase fQueueSize first
     253    fQueueSize++;
     254
     255    // ... and shift the deadline timer
     256    // This is not ideal, because if we are continously
     257    // filling the buffer, it will never timeout
    258258    AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
    259 }
    260 
    261 // It is important that when SendMessageImp is called, or to be more
    262 // precise boost::bind is called, teh data is copied!
    263 void ConnectionUSB::SendMessageImp(const vector<uint8_t> msg)
    264 {
    265     /*
    266     if (!fConnectionEstablished)
    267     {
    268         UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
    269         return;
    270     }*/
    271 
    272     const bool first_message_in_queue = fOutQueue.empty();
    273 
    274     // This is "thread" safe because SendMessage and HandleSentMessage
    275     // are serialized in the EventQueue. Note: Do not call these
    276     // functions directly from any other place then Handlers, use
    277     // PostMessage instead
    278     fOutQueue.push_back(msg);
    279 
    280     if (!first_message_in_queue)
    281         return;
    282 
    283 #ifdef DEBUG
    284     ofstream fout("send.txt", ios::app);
    285     fout << Time() << ": ";
    286     for (unsigned int i=0; i<msg.size(); i++)
    287         fout << hex << setfill('0') << setw(2) << (uint32_t)msg[i];
    288     fout << endl;
    289 #endif
    290 
    291     // AsyncWrite + Deadline
    292     AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &ConnectionUSB::HandleSentData*/);
    293     AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
    294 }
    295 
    296 void ConnectionUSB::PostMessage(const void *ptr, size_t max)
    297 {
    298     const vector<uint8_t> msg(reinterpret_cast<const uint8_t*>(ptr),
    299                               reinterpret_cast<const uint8_t*>(ptr)+max);
    300 
    301     get_io_service().post(boost::bind(&ConnectionUSB::SendMessageImp, this, msg));
     259
     260    // Now we can schedule the buffer to be sent
     261    AsyncWrite(ba::const_buffers_1(ptr, sz));
    302262}
    303263
     
    307267        max = cmd.length()+1;
    308268
    309     vector <char>msg(max);
    310 
    311     copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
    312 
    313     PostMessage(msg);
     269    PostMessage(cmd.c_str(), min(cmd.length()+1, max));
    314270}
    315271
     
    349305    }
    350306
     307    fQueueSize = 0;
    351308    fConnectionStatus = kConnected;
    352309
     
    369326fFlowControl(flow_control::hardware),
    370327fInTimeout(ioservice), fOutTimeout(ioservice), fConnectTimeout(ioservice),
    371 fConnectionStatus(kDisconnected)
    372 {
    373 }
     328fQueueSize(0), fConnectionStatus(kDisconnected)
     329{
     330}
  • trunk/FACT++/src/ConnectionUSB.h

    r16090 r16768  
    3838    boost::asio::deadline_timer   fOutTimeout;
    3939    boost::asio::deadline_timer   fConnectTimeout;
    40     std::list<std::vector<uint8_t>> fOutQueue;
     40
     41    size_t fQueueSize;
    4142
    4243    ConnectionStatus_t fConnectionStatus;
     
    8182
    8283    // ------------------------ write --------------------------
    83     void SendMessageImp(const std::vector<uint8_t> msg);
    8484    void PostMessage(const void *msg, size_t s=0);
    8585    void PostMessage(const std::string &cmd, size_t s=-1);
Note: See TracChangeset for help on using the changeset viewer.