Writing a simple C++ protobuf streaming client/server

10,250

Solution 1

First, the C++ protobuf API lacks built-in support for sending multiple protobuf messages over a single stream/connection. The Java API has it, but it still hasn't been added to the C++ version. Kenton Varda (creator of protobuf v2) was nice enough to post the C++ version. So you need that code to get support for multiple messages on your single connection.

Then, you can create your client/server using boost::asio . Don't try to use the istream/ostream style interface asio provides; it is easier to wrap that and create the stream types (ZeroCopyInputStream/ZeroCopyOutputStream) required by protobuf, but it doesn't work. I don't completely understand why, but this answer by Fulkerson talks about the brittle nature of trying to do it. It also provides sample code to adapt the raw sockets into the types we need.

Putting all of this together along with a basic boost::asio tutorial, here are the client and server, followed by the supporting code. We are sending multiple instances of a simple protobuf class called persistence::MyMessage located in MyMessage.pb.h. Replace it with your own.

Client:

#include <boost/asio.hpp>
#include "ProtobufHelpers.h"
#include "AsioAdapting.h"
#include "MyMessage.pb.h"
using boost::asio::ip::tcp;
int main()
{
    const char* hostname = "127.0.0.1";
    const char* port = "27015";
    boost::asio::io_service io_service;
    tcp::resolver resolver(io_service);
    tcp::resolver::query query(hostname, port);
    tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
    tcp::socket socket(io_service);
    boost::asio::connect(socket, endpoint_iterator);
    AsioInputStream<tcp::socket> ais(socket);
    CopyingInputStreamAdaptor cis_adp(&ais);
    for (;;)
    {
        persistence::MyMessage myMessage;
        google::protobuf::io::readDelimitedFrom(&cis_adp, &myMessage);
    }
    return 0;
}

Server:

#include <boost/asio.hpp>
#include "ProtobufHelpers.h"
#include "AsioAdapting.h"
#include "MyMessage.pb.h"
using boost::asio::ip::tcp;
int main()
{
    boost::asio::io_service io_service;
    tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 27015));
    for (;;)
    {
        tcp::socket socket(io_service);
        acceptor.accept(socket);
        AsioOutputStream<boost::asio::ip::tcp::socket> aos(socket); // Where m_Socket is a instance of boost::asio::ip::tcp::socket
        CopyingOutputStreamAdaptor cos_adp(&aos);
        int i = 0;
        do {
            ++i;
            persistence::MyMessage myMessage;
            myMessage.set_myString("hello world");
            myMessage.set_myInt(i);
            google::protobuf::io::writeDelimitedTo(metricInfo, &cos_adp);
            // Now we have to flush, otherwise the write to the socket won't happen until enough bytes accumulate
            cos_adp.Flush(); 
        } while (true);
    }
    return 0;
}

Here are the supporting files courtesy of Kenton Varda:

ProtobufHelpers.h

#pragma once
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/message_lite.h>
namespace google {
    namespace protobuf {
        namespace io {
            bool writeDelimitedTo(
                const google::protobuf::MessageLite& message,
                google::protobuf::io::ZeroCopyOutputStream* rawOutput);

            bool readDelimitedFrom(
                google::protobuf::io::ZeroCopyInputStream* rawInput,
                google::protobuf::MessageLite* message);
        }
    }
}

and

ProtobufHelpers.cpp

#include "ProtobufHelpers.h"
namespace google {
    namespace protobuf {
        namespace io {
            bool writeDelimitedTo(
                const google::protobuf::MessageLite& message,
                google::protobuf::io::ZeroCopyOutputStream* rawOutput) {
                // We create a new coded stream for each message.  Don't worry, this is fast.
                google::protobuf::io::CodedOutputStream output(rawOutput);

                // Write the size.
                const int size = message.ByteSize();
                output.WriteVarint32(size);

                uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
                if (buffer != NULL) {
                    // Optimization:  The message fits in one buffer, so use the faster
                    // direct-to-array serialization path.
                    message.SerializeWithCachedSizesToArray(buffer);
                }
                else {
                    // Slightly-slower path when the message is multiple buffers.
                    message.SerializeWithCachedSizes(&output);
                    if (output.HadError()) return false;
                }

                return true;
            }

            bool readDelimitedFrom(
                google::protobuf::io::ZeroCopyInputStream* rawInput,
                google::protobuf::MessageLite* message) {
                // We create a new coded stream for each message.  Don't worry, this is fast,
                // and it makes sure the 64MB total size limit is imposed per-message rather
                // than on the whole stream.  (See the CodedInputStream interface for more
                // info on this limit.)
                google::protobuf::io::CodedInputStream input(rawInput);

                // Read the size.
                uint32_t size;
                if (!input.ReadVarint32(&size)) return false;

                // Tell the stream not to read beyond that size.
                google::protobuf::io::CodedInputStream::Limit limit =
                    input.PushLimit(size);

                // Parse the message.
                if (!message->MergeFromCodedStream(&input)) return false;
                if (!input.ConsumedEntireMessage()) return false;

                // Release the limit.
                input.PopLimit(limit);

                return true;
            }
        }
    }
}

and courtesy of Fulkerson:

AsioAdapting.h

#pragma once
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>

using namespace google::protobuf::io;


template <typename SyncReadStream>
class AsioInputStream : public CopyingInputStream {
public:
    AsioInputStream(SyncReadStream& sock);
    int Read(void* buffer, int size);
private:
    SyncReadStream& m_Socket;
};


template <typename SyncReadStream>
AsioInputStream<SyncReadStream>::AsioInputStream(SyncReadStream& sock) :
    m_Socket(sock) {}


template <typename SyncReadStream>
int
AsioInputStream<SyncReadStream>::Read(void* buffer, int size)
{
    std::size_t bytes_read;
    boost::system::error_code ec;
    bytes_read = m_Socket.read_some(boost::asio::buffer(buffer, size), ec);

    if (!ec) {
        return bytes_read;
    }
    else if (ec == boost::asio::error::eof) {
        return 0;
    }
    else {
        return -1;
    }
}


template <typename SyncWriteStream>
class AsioOutputStream : public CopyingOutputStream {
public:
    AsioOutputStream(SyncWriteStream& sock);
    bool Write(const void* buffer, int size);
private:
    SyncWriteStream& m_Socket;
};


template <typename SyncWriteStream>
AsioOutputStream<SyncWriteStream>::AsioOutputStream(SyncWriteStream& sock) :
    m_Socket(sock) {}


template <typename SyncWriteStream>
bool
AsioOutputStream<SyncWriteStream>::Write(const void* buffer, int size)
{
    boost::system::error_code ec;
    m_Socket.write_some(boost::asio::buffer(buffer, size), ec);
    return !ec;
}

Solution 2

I'd recommend using gRPC. It supports "streaming" requests in which the client and server can send multiple messages in either direction over time as part of a single logical request, which should suit your needs. With gRPC a lot of the nitty-gritty setup is taken care of for you, you have extensive documentation and tutorials to follow, TLS encryption is built-in, you have cross-language support, you can easily add new kinds of requests and parallel streams, etc.

Share:
10,250
aggieNick02
Author by

aggieNick02

Updated on June 15, 2022

Comments

  • aggieNick02
    aggieNick02 almost 2 years

    I want to use protobuf to send messages back and forth between a client and server. In my case, I want to send an arbitrary number of protobuf messages from the server to the client. How can I build this quickly in C++?

    Note: I wrote this question along with my answer after pooling a really useful Kenton Varda answer and Fulkerson answer on stackoverflow. Others have asked similar questions and hit similar roadblocks - see here, here, and here.

    I'm new with protobuf and asio so feel free to correct/suggest improvements, or provide your own answer.

  • Vinay Shukla
    Vinay Shukla about 6 years
    Hello Kenton, Need your help with gRPC. I wish to understand what do you mean by multiple messages in either direction is this message sent serially or parallely on different connections spawned using threads ?
  • Kenton Varda
    Kenton Varda about 6 years
    @VinayShukla Look for "streaming" in the gRPC documentation. Sorry, I don't have any more information, I haven't used it myself.
  • Vinay Shukla
    Vinay Shukla about 6 years
    Thanks for your help, I would highly appreciate if you can comment on this question of mine please. stackoverflow.com/questions/49852761/…
  • GeminiDakota
    GeminiDakota almost 3 years
    Nice write up - It looks like there is a "delimited_message_util.h" which now adds the ability to "SerializeDelimitedToOstream". It basically puts the size of the message before each message. github.com/protocolbuffers/protobuf/blob/master/src/google/…