// ************************************************************************** /** @class Connection @brief Maintains an ansynchronous TCP/IP client connection */ // ************************************************************************** #include "ConnectionUSB.h" #include using namespace std; namespace ba = boost::asio; namespace bs = boost::system; namespace dummy = ba::placeholders; using ba::serial_port_base; //#define DEBUG_TX // -------- Abbreviations for starting async tasks --------- int ConnectionUSB::Write(const Time &t, const string &txt, int qos) { if (fLog) return fLog->Write(t, txt, qos); return MessageImp::Write(t, txt, qos); } void ConnectionUSB::AsyncRead(const ba::mutable_buffers_1 buffers, int type, int counter) { ba::async_read(*this, buffers, boost::bind(&ConnectionUSB::HandleReceivedData, this, dummy::error, dummy::bytes_transferred, type, counter)); } void ConnectionUSB::AsyncWrite(const ba::const_buffers_1 &buffers) { ba::async_write(*this, buffers, boost::bind(&ConnectionUSB::HandleSentData, this, dummy::error, dummy::bytes_transferred)); } void ConnectionUSB::AsyncWait(ba::deadline_timer &timer, int millisec, void (ConnectionUSB::*handler)(const bs::error_code&)) { // - The boost::asio::basic_deadline_timer::expires_from_now() // function cancels any pending asynchronous waits, and returns // the number of asynchronous waits that were cancelled. If it // returns 0 then you were too late and the wait handler has // already been executed, or will soon be executed. If it // returns 1 then the wait handler was successfully cancelled. // - If a wait handler is cancelled, the bs::error_code passed to // it contains the value bs::error::operation_aborted. timer.expires_from_now(boost::posix_time::milliseconds(millisec)); timer.async_wait(boost::bind(handler, this, dummy::error)); } // ------------------------ close -------------------------- // close from another thread void ConnectionUSB::CloseImp(bool restart) { if (IsConnected()) { ostringstream str; str << "Closing connection to " << URL() << "."; Info(str); } // Close possible open connections bs::error_code ec; cancel(ec); if (ec) Error("Cancel async requests on "+URL()+": "+ec.message()); if (IsConnected()) { close(ec); if (ec) Error("Closing "+URL()+": "+ec.message()); } // Reset the connection status fConnectionStatus = kDisconnected; // Stop deadline counters fInTimeout.cancel(); fOutTimeout.cancel(); // Empty output queue fOutQueue.clear(); if (!restart || IsConnecting()) return; // We need some timeout before reconnecting! // And we have to check if we are alreayd trying to connect // We shoudl wait until all operations in progress were canceled // Start trying to reconnect Connect(); } void ConnectionUSB::PostClose(bool restart) { get_io_service().post(boost::bind(&ConnectionUSB::CloseImp, this, restart)); } // ------------------------ write -------------------------- void ConnectionUSB::HandleWriteTimeout(const bs::error_code &error) { if (error==ba::error::basic_errors::operation_aborted) return; // 125: Operation canceled (bs::error_code(125, bs::system_category)) if (error) { ostringstream str; str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl; Error(str); CloseImp(false); return; } if (!is_open()) { // For example: Here we could schedule a new accept if we // would not want to allow two connections at the same time. return; } // Check whether the deadline has passed. We compare the deadline // against the current time since a new asynchronous operation // may have moved the deadline before this actor had a chance // to run. if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now()) return; Error("fOutTimeout has expired, writing data to "+URL()); CloseImp(false); } void ConnectionUSB::HandleSentData(const bs::error_code& error, size_t n) { if (error && error != ba::error::not_connected) { ostringstream str; str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl; Error(str); CloseImp(false); return; } if (error == ba::error::not_connected) { ostringstream msg; msg << n << " bytes could not be sent to " << URL() << " due to missing connection."; Warn(msg); } else { #ifdef DEBUG_TX ostringstream msg; msg << n << " bytes successfully sent to " << URL(); Message(msg); #endif } HandleTransmittedData(n); // This is "thread" safe because SendMessage and HandleSentMessage // are serialized in the EventQueue. Note: Do not call these // functions directly from any other place then Handlers, use // PostMessage instead fOutQueue.pop_front(); if (fOutQueue.empty()) { // Queue went empty, remove deadline fOutTimeout.cancel(); return; } // AsyncWrite + Deadline AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &ConnectionUSB::HandleSentData*/); AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout); } // It is important that when SendMessageImp is called, or to be more // precise boost::bind is called, teh data is copied! void ConnectionUSB::SendMessageImp(const vector msg) { /* if (!fConnectionEstablished) { UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+"."); return; }*/ const bool first_message_in_queue = fOutQueue.empty(); // This is "thread" safe because SendMessage and HandleSentMessage // are serialized in the EventQueue. Note: Do not call these // functions directly from any other place then Handlers, use // PostMessage instead fOutQueue.push_back(msg); if (!first_message_in_queue) return; // AsyncWrite + Deadline AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &ConnectionUSB::HandleSentData*/); AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout); } void ConnectionUSB::PostMessage(const void *ptr, size_t max) { const vector msg(reinterpret_cast(ptr), reinterpret_cast(ptr)+max); get_io_service().post(boost::bind(&ConnectionUSB::SendMessageImp, this, msg)); } void ConnectionUSB::PostMessage(const string &cmd, size_t max) { if (max==size_t(-1)) max = cmd.length()+1; vector msg(max); copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin()); PostMessage(msg); } void ConnectionUSB::Connect() { fConnectionStatus = kConnecting; bs::error_code ec; open(URL(), ec); ostringstream msg; msg << "Connecting to " << URL() << "... "; if (ec) msg << " " << ec.message() << " (" << ec << ")"; else msg << "success."; if (ec) { Error(msg); fConnectionStatus = kDisconnected; return; } Info(msg); try { set_option(fBaudRate); set_option(fCharacterSize); set_option(fParity); set_option(fStopBits); set_option(fFlowControl); } catch (const bs::system_error &erc) { Error(string("Setting connection options: ")+erc.what()); // CLOSE return; } fConnectionStatus = kConnected; ConnectionEstablished(); } void ConnectionUSB::SetEndpoint(const string &addr) { if (fConnectionStatus>=1) Warn("Connection or connection attempt in progress. New endpoint only valid for next connection."); fAddress = "/dev/"+addr; } ConnectionUSB::ConnectionUSB(ba::io_service& ioservice, ostream &out) : MessageImp(out), ba::serial_port(ioservice), fLog(0), fBaudRate(115200), fCharacterSize(8), fParity(parity::none), fStopBits(stop_bits::one), fFlowControl(flow_control::none), fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionStatus(kDisconnected) { }