boost async tcp client

16,796

boost::asio::ip::tcp::socket::async_read_some as the name suggests is not guaranteed to read complete data. It sets error object to boost::asio::error::eof when client is finished writing.

The error you are getting is because of this:

server part

        if (!error)
        {
            ...
        }
        else
        {
            delete this;
        }

In else block, you are assuming that this is a error case and closing the connection. This is not always the case. Before else you need to check for error == boost::asio::error::eof.

Apart from this in read handler, you should keep collecting whatever is read in a buffer till you hit error == boost::asio::error::eof. Only then you should validate read data and write back to client.

Take a look at HTTP server 1, 2, 3 implementation in examples section.

Update: Answer to updated question

You have thread synchronization issue with the updated code.

  1. msgQueue is simultaneously accessed from two or more threads without any lock.
  2. Read and write on the same socket can be called simultaneously.

If I understood your problem correctly, you want to:

  1. take user input and send that to server.
  2. Keep receiving server's response simultaneously.

You can use two boost::asio::io_service::strands for the two tasks. When using Asio, strands are the way to synchronize your tasks. Asio makes sure that tasks posted in a strand are executed synchronously.

  1. In strand1 post a send task that looks like: read_user_input -> send_to_server -> handle_send -> read_user_input

  2. In strand2 post a read task that looks like: read_some -> handle_read -> read_some

This will make sure msgQueue is not accessed simultaneously from two threads. Use two sockets for read and write to server, to make sure simultaneous read and write is not called on the same socket.

Share:
16,796
Torrius
Author by

Torrius

Updated on June 04, 2022

Comments

  • Torrius
    Torrius almost 2 years

    I've just started working with boost. I'm writting TCP client-server with async sockets.

    The task is the following:

    1. Client send to server a number
    2. Client can send another nubmer before receiving server's answer.
    3. Server receives a number, do some computing with it and send back the result to client.
    4. Multiple clients can be connected to server.

    Now works the following

    • send a number from client to sever
    • server recieves a number in current thread and computes right in the OnReceive handler (I know this is bad...but how I should start a new thread to do computing in parallel)
    • server sends answer back but client already disconnected

    How can allow client to input numbers from keyboard and to wait an answer from the server at the same time?

    And why does my client not wait for the answer from sever?

    The client code:

    using boost::asio::ip::tcp;
    
    class TCPClient
    {
        public:
            TCPClient(boost::asio::io_service& IO_Service, tcp::resolver::iterator EndPointIter);
            void Close();
    
        private:
            boost::asio::io_service& m_IOService;
            tcp::socket m_Socket;
    
            string m_SendBuffer;
            static const size_t m_BufLen = 100;
            char m_RecieveBuffer[m_BufLen*2];
    
            void OnConnect(const boost::system::error_code& ErrorCode, tcp::resolver::iterator EndPointIter);
            void OnReceive(const boost::system::error_code& ErrorCode);
            void OnSend(const boost::system::error_code& ErrorCode);
            void DoClose();
    };
    
    TCPClient::TCPClient(boost::asio::io_service& IO_Service, tcp::resolver::iterator EndPointIter)
    : m_IOService(IO_Service), m_Socket(IO_Service), m_SendBuffer("")
    {
        tcp::endpoint EndPoint = *EndPointIter;
    
        m_Socket.async_connect(EndPoint,
            boost::bind(&TCPClient::OnConnect, this, boost::asio::placeholders::error, ++EndPointIter));
    }
    
    void TCPClient::Close()
    {
        m_IOService.post(
            boost::bind(&TCPClient::DoClose, this));
    }
    void TCPClient::OnConnect(const boost::system::error_code& ErrorCode, tcp::resolver::iterator EndPointIter)
    {
        cout << "OnConnect..." << endl;
        if (ErrorCode == 0)
        {
            cin >> m_SendBuffer;
            cout << "Entered: " << m_SendBuffer << endl;
            m_SendBuffer += "\0";
    
            m_Socket.async_send(boost::asio::buffer(m_SendBuffer.c_str(),m_SendBuffer.length()+1),
                boost::bind(&TCPClient::OnSend, this,
                boost::asio::placeholders::error));
        } 
        else if (EndPointIter != tcp::resolver::iterator())
        {
            m_Socket.close();
            tcp::endpoint EndPoint = *EndPointIter;
    
            m_Socket.async_connect(EndPoint, 
                boost::bind(&TCPClient::OnConnect, this, boost::asio::placeholders::error, ++EndPointIter));
        }
    }
    
    void TCPClient::OnReceive(const boost::system::error_code& ErrorCode)
    {
        cout << "receiving..." << endl;
        if (ErrorCode == 0)
        {
            cout << m_RecieveBuffer << endl;
    
            m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
                boost::bind(&TCPClient::OnReceive, this, boost::asio::placeholders::error));
        } 
        else 
        {
            cout << "ERROR! OnReceive..." << endl;
            DoClose();
        }
    }
    
    void TCPClient::OnSend(const boost::system::error_code& ErrorCode)
    {
        cout << "sending..." << endl;
        if (!ErrorCode)
        {
            cout << "\""<< m_SendBuffer <<"\" has been sent" << endl;
            m_SendBuffer = "";
    
            m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
                boost::bind(&TCPClient::OnReceive, this, boost::asio::placeholders::error));
        }
        else
        {
            cout << "OnSend closing" << endl;
            DoClose();
        }
    
    }
    
    void TCPClient::DoClose()
    {
        m_Socket.close();
    }
    
    int main()
    {
        try 
        {
            cout << "Client is starting..." << endl;
            boost::asio::io_service IO_Service;
    
            tcp::resolver Resolver(IO_Service);
    
            string port = "13";
            tcp::resolver::query Query("127.0.0.1", port);
    
            tcp::resolver::iterator EndPointIterator = Resolver.resolve(Query);
    
            TCPClient Client(IO_Service, EndPointIterator);
    
            cout << "Client is started!" << endl;
    
            cout << "Enter a query string " << endl;
    
            boost::thread ClientThread(boost::bind(&boost::asio::io_service::run, &IO_Service));
    
            Client.Close();
            ClientThread.join();
        } 
        catch (exception& e)
        {
            cerr << e.what() << endl;
        }
    
        cout << "\nClosing";
        getch();
    }
    

    Here is output from console

    Client is starting...
    Client is started!
    OnConnect...
    12
    Entered: 12
    sending...
    "12" has been sent
    receiving...
    ERROR! OnReceive...
    
    Closing
    

    Server part

    class Session
    {
        public:
            Session(boost::asio::io_service& io_service)
                : socket_(io_service)
            {
                dataRx[0] = '\0';
                dataTx[0] = '\0';
            }
    
            tcp::socket& socket()
            {
                return socket_;
            }
    
            void start()
            {
                socket_.async_read_some(boost::asio::buffer(dataRx, max_length),
                    boost::bind(&Session::handle_read, this,
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));
            }
    
            void handle_read(const boost::system::error_code& error, size_t bytes_transferred)
            {
                cout << "reading..." << endl;
                cout << "Data: " << dataRx << endl;
    
                if (!error)
                {
                    if (!isValidData())
                    {
                        cout << "Bad data!" << endl;
                        sprintf(dataTx, "Bad data!\0");
                        dataRx[0] = '\0';
                    }
                    else
                    {
                        sprintf(dataTx, getFactorization().c_str());
                        dataRx[0] = '\0';
                    }
    
                    boost::asio::async_write(socket_,
                        boost::asio::buffer(dataTx, max_length*2),
                        boost::bind(&Session::handle_write, this,
                        boost::asio::placeholders::error));
                }
                else
                {
                    delete this;
                }
            }
    
            void handle_write(const boost::system::error_code& error)
            {
                cout << "writing..." << endl;
                if (!error)
                {
                    cout << "dataTx sent: " << dataTx << endl;
                    dataTx[0] = '\0';
    
                    socket_.async_read_some(boost::asio::buffer(dataRx, max_length),
                        boost::bind(&Session::handle_read, this,
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred));
                }
                else
                {
                    delete this;
                }
            }
    
            string getFactorization() const
            {
                //Do something
            }
    
            bool isValidData()
            {
                locale loc; 
                for (int i = 0; i < strlen(dataRx); i++)
                    if (!isdigit(dataRx[i],loc))
                        return false;
    
                return true;
            }
    
        private:
            tcp::socket socket_;
            static const size_t max_length = 100;
            char dataRx[max_length];
            char dataTx[max_length*2];
    };
    
    class Server
    {
        public:
            Server(boost::asio::io_service& io_service, short port)
                : io_service_(io_service),
                acceptor_(io_service, tcp::endpoint(tcp::v4(), port))
            {
                Session* new_session = new Session(io_service_);
                acceptor_.async_accept(new_session->socket(),
                    boost::bind(&Server::handle_accept, this, new_session,
                    boost::asio::placeholders::error));
            }
    
            void handle_accept(Session* new_session, const boost::system::error_code& error)
            {
                if (!error)
                {
                    new_session->start();
                    new_session = new Session(io_service_);
                    acceptor_.async_accept(new_session->socket(),
                        boost::bind(&Server::handle_accept, this, new_session,
                        boost::asio::placeholders::error));
                }
                else
                {
                    delete new_session;
                }
            }
    
        private:
            boost::asio::io_service& io_service_;
            tcp::acceptor acceptor_;
    };
    
    int main(int argc, char* argv[])
    {
        cout << "Server is runing..." << endl;
        try
        {
            boost::asio::io_service io_service;
    
            int port = 13;
            Server s(io_service, port);
            cout << "Server is run!" << endl;
            io_service.run();
        }
        catch (boost::system::error_code& e)
        {
            std::cerr << e << "\n";
        }
        catch (std::exception& e)
        {
            std::cerr << "Exception: " << e.what() << "\n";
        }
    
        return 0;
    }
    

    Server's ouput

    Server is runing...
    Server is run!
    reading...
    Data: 12
    writing...
    dataTx sent: 13    //just send back received ++number
    reading...
    Data:
    

    Your help will be very appreciated

    ========

    Added

    Ok, I understand. But check ErrorCode == boost::asio::error::eof does not works... What have I done wrong?

    else if (ErrorCode == boost::asio::error::eof)
    {
        cout << "boost::asio::error::eof in OnReceive!" << endl;
    }
    else 
    {
        cout << "ERROR! OnReceive..." << ErrorCode << endl;
        DoClose();
    }
    

    The print out is ERROR! OnReceive...system:10009 it seems to be my comparison is incorrect

    ========

    Added

    I found the root cause. I've stated use async_receive (instead of async_read_some) and swaped the lines in main to

    ClientThread.join();
    Client.Close();
    

    Now it works fine!

    Now I'm trying to read and write data from/to socket at the same time (because the client should be able to sent additional requests before answer from the server is recieved.

    In OnConnect function I create boost threads:

    boost::thread addMsgThread(boost::bind(&TCPClient::addMsgLoop, this));
    boost::thread receivingThread(boost::bind(&TCPClient::startReceiving, this));
    boost::thread sendingThread(boost::bind(&TCPClient::startSending, this));
    

    with inplementation

    void TCPClient::startReceiving()
    {
        cout << "receiving..." << endl;
        m_RecieveBuffer[0] = '\0';
        m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
            boost::bind(&TCPClient::receivingLoop, this, boost::asio::placeholders::error)); //runtime error here
        cout << "m_RecieveBuffer = " << m_RecieveBuffer << endl;
    }
    
    void TCPClient::receivingLoop(const boost::system::error_code& ErrorCode)
    {
        cout << "receiving..." << endl;
        if (ErrorCode == 0)
        {
            cout << "m_RecieveBuffer = " << m_RecieveBuffer << endl;
    
            m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
                boost::bind(&TCPClient::receivingLoop, this, boost::asio::placeholders::error));
        }
        else 
        {
            cout << "ERROR! receivingLoop..." << ErrorCode << endl;
            DoClose();
        }
    }
    
    void TCPClient::addMsgLoop()
    {
        while (true)
        {
            string tmp;
            cin >> tmp;
    
            cout << "Entered: " << tmp << endl;
            tmp += "\0";
    
            try
            {
                msgQueue.push(tmp);
            }
            catch(exception &e)
            {
                cerr << "Canno add msg to send queue... " << e.what() << endl;
            }
        }
    }
    

    The issue is the same with both receive and send threads: runtime error (writing access violation somewhere in boost libraries).

    void TCPClient::startReceiving()
    {
         ...
         m_Socket.async_receive(); //runtime error here
    }
    

    In sequent version all works fine (but I don't know how to implement multiple sending before answer). Can anybody tell me how to fix the issue or how implement this by another way? May be pooling can help but I'm now sure that it is good way.

  • Torrius
    Torrius over 11 years
    error == boost::asio::error::eof is not working (see question below)
  • Vikas
    Vikas over 11 years
    @Torrius, did you add this to both, server and client? From the code you posted, it seems you've added only to client's read handler. Also please edit your original question to add further comments. Don't add those as another answer.
  • Vikas
    Vikas over 11 years
    @Torrius, updated my answer for your edited question. Please create a new question, if it is not related to original question.