// ************************************************************************** /** @class Connection @brief Maintains an ansynchronous TCP/IP client connection */ // ************************************************************************** #include "Connection.h" #include #include using namespace std; namespace ba = boost::asio; namespace bs = boost::system; namespace dummy = ba::placeholders; using boost::lexical_cast; using ba::ip::tcp; // -------- Abbreviations for starting async tasks --------- int Connection::Write(const Time &t, const char *txt, int qos) { if (fLog) return fLog->Write(t, txt, qos); return MessageImp::Write(t, txt, qos); } void Connection::AsyncRead(ba::mutable_buffers_1 buffers) { ba::async_read(*this, buffers, boost::bind(&Connection::HandleReceivedData, this, dummy::error, dummy::bytes_transferred)); } void Connection::AsyncWrite(ba::mutable_buffers_1 buffers) { ba::async_write(*this, buffers, boost::bind(&Connection::HandleSentData, this, dummy::error, dummy::bytes_transferred)); } void Connection::AsyncWait(ba::deadline_timer &timer, int millisec, void (Connection::*handler)(const bs::error_code&)) { // - The boost::asio::basic_deadline_timer::expires_from_now() // function cancels any pending asynchronous waits, and returns // the number of asynchronous waits that were cancelled. If it // returns 0 then you were too late and the wait handler has // already been executed, or will soon be executed. If it // returns 1 then the wait handler was successfully cancelled. // - If a wait handler is cancelled, the bs::error_code passed to // it contains the value bs::error::operation_aborted. timer.expires_from_now(boost::posix_time::milliseconds(millisec)); timer.async_wait(boost::bind(handler, this, dummy::error)); } void Connection::AsyncConnect(tcp::resolver::iterator iterator) { //cout << "Start async connect(...)" << endl; fConnectionStatus = 1; tcp::endpoint endpoint = *iterator; // AsyncConnect + Deadline async_connect(endpoint, boost::bind(&Connection::ConnectImp, this, ba::placeholders::error, ++iterator)); // We will get a "Connection timeout anyway" //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout); } // ------------------------ close -------------------------- // close from another thread void Connection::CloseImp(bool restart) { if (IsConnected()) { stringstream str; str << "Connection closed to " << URL() << ".";// << endl; Message(str); } // Close the connection fConnectionStatus = 0; close(); // Stop deadline counters fInTimeout.cancel(); fOutTimeout.cancel(); //fConnectTimeout.cancel(); if (!restart || IsConnecting()) return; // We need some timeout before reconnecting! // And we have to check if we are alreayd trying to connect // We shoudl wait until all operations in progress were canceled // Start trying to reconnect AsyncConnect(); } void Connection::PostClose(bool restart) { get_io_service().post(boost::bind(&Connection::CloseImp, this, restart)); } // ------------------------ write -------------------------- void Connection::HandleWriteTimeout(const bs::error_code &error) { // 125: Operation canceled if (error && error!=bs::error_code(125, bs::system_category)) { stringstream str; str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl; Error(str); } if (!is_open()) { // For example: Here we could schedule a new accept if we // would not want to allow two connections at the same time. return; } // Check whether the deadline has passed. We compare the deadline // against the current time since a new asynchronous operation // may have moved the deadline before this actor had a chance // to run. if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now()) return; Error("fOutTimeout has expired, writing data to "+URL()); CloseImp(); } void Connection::HandleSentData(const bs::error_code& error, size_t) { if (error) { stringstream str; str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl; Error(str); CloseImp(); return; } Message("Data successfully sent to "+URL()); // This is "thread" safe because SendMessage and HandleSentMessage // are serialized in the EventQueue. Note: Do not call these // functions directly from any other place then Handlers, use // PostMessage instead fOutQueue.pop_front(); if (fOutQueue.empty()) { // Queue went empty, remove deadline fOutTimeout.cancel(); return; } // AsyncWrite + Deadline AsyncWrite(ba::buffer(fOutQueue.front())/*, &Connection::HandleSentData*/); AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout); } void Connection::SendMessageImp(const vector &msg) { /* if (!fConnectionEstablished) { UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+"."); return; }*/ const bool first_message_in_queue = fOutQueue.empty(); // This is "thread" safe because SendMessage and HandleSentMessage // are serialized in the EventQueue. Note: Do not call these // functions directly from any other place then Handlers, use // PostMessage instead fOutQueue.push_back(msg); if (!first_message_in_queue) return; // AsyncWrite + Deadline AsyncWrite(ba::buffer(fOutQueue.front())/*, &Connection::HandleSentData*/); AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout); } void Connection::PostMessage(const vector &msg) { get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg)); } void Connection::PostMessage(const string &cmd, size_t max) { if (max==size_t(-1)) max = cmd.length()+1; vector msg(max); for (unsigned int i=0; i=1) Warn("Connection or connection attempt in progress. New endpoint only valid for next connection."); fAddress = addr; fPort = lexical_cast(port); } Connection::Connection(ba::io_service& ioservice, ostream &out) : MessageImp(out), tcp::socket(ioservice), fLog(0), fAddress("localhost"), fPort("5000"), fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice), fConnectionStatus(0) { } Connection::Connection(ba::io_service& ioservice, const string &addr, int port) : tcp::socket(ioservice), fLog(0), fAddress(addr), fPort(lexical_cast(port)), fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice), fConnectionStatus(0) { } Connection::Connection(ba::io_service& ioservice, const string &addr, const string &port) : tcp::socket(ioservice), fLog(0), fAddress(addr), fPort(port), fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice), fConnectionStatus(0) { }