Are there C++ equivalents for the Protocol Buffers delimited I/O functions in Java?

30,781

Solution 1

I'm a bit late to the party here, but the below implementations include some optimizations missing from the other answers and will not fail after 64MB of input (though it still enforces the 64MB limit on each individual message, just not on the whole stream).

(I am the author of the C++ and Java protobuf libraries, but I no longer work for Google. Sorry that this code never made it into the official lib. This is what it would look like if it had.)

bool writeDelimitedTo(
    const google::protobuf::MessageLite& message,
    google::protobuf::io::ZeroCopyOutputStream* rawOutput) {
  // We create a new coded stream for each message.  Don't worry, this is fast.
  google::protobuf::io::CodedOutputStream output(rawOutput);

  // Write the size.
  const int size = message.ByteSize();
  output.WriteVarint32(size);

  uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
  if (buffer != NULL) {
    // Optimization:  The message fits in one buffer, so use the faster
    // direct-to-array serialization path.
    message.SerializeWithCachedSizesToArray(buffer);
  } else {
    // Slightly-slower path when the message is multiple buffers.
    message.SerializeWithCachedSizes(&output);
    if (output.HadError()) return false;
  }

  return true;
}

bool readDelimitedFrom(
    google::protobuf::io::ZeroCopyInputStream* rawInput,
    google::protobuf::MessageLite* message) {
  // We create a new coded stream for each message.  Don't worry, this is fast,
  // and it makes sure the 64MB total size limit is imposed per-message rather
  // than on the whole stream.  (See the CodedInputStream interface for more
  // info on this limit.)
  google::protobuf::io::CodedInputStream input(rawInput);

  // Read the size.
  uint32_t size;
  if (!input.ReadVarint32(&size)) return false;

  // Tell the stream not to read beyond that size.
  google::protobuf::io::CodedInputStream::Limit limit =
      input.PushLimit(size);

  // Parse the message.
  if (!message->MergeFromCodedStream(&input)) return false;
  if (!input.ConsumedEntireMessage()) return false;

  // Release the limit.
  input.PopLimit(limit);

  return true;
}

Solution 2

Okay, so I haven't been able to find top-level C++ functions implementing what I need, but some spelunking through the Java API reference turned up the following, inside the MessageLite interface:

void writeDelimitedTo(OutputStream output)
/*  Like writeTo(OutputStream), but writes the size of 
    the message as a varint before writing the data.   */

So the Java size prefix is a (Protocol Buffers) varint!

Armed with that information, I went digging through the C++ API and found the CodedStream header, which has these:

bool CodedInputStream::ReadVarint32(uint32 * value)
void CodedOutputStream::WriteVarint32(uint32 value)

Using those, I should be able to roll my own C++ functions that do the job.

They should really add this to the main Message API though; it's missing functionality considering Java has it, and so does Marc Gravell's excellent protobuf-net C# port (via SerializeWithLengthPrefix and DeserializeWithLengthPrefix).

Solution 3

I solved the same problem using CodedOutputStream/ArrayOutputStream to write the message (with the size) and CodedInputStream/ArrayInputStream to read the message (with the size).

For example, the following pseudo-code writes the message size following by the message:

const unsigned bufLength = 256;
unsigned char buffer[bufLength];
Message protoMessage;

google::protobuf::io::ArrayOutputStream arrayOutput(buffer, bufLength);
google::protobuf::io::CodedOutputStream codedOutput(&arrayOutput);

codedOutput.WriteLittleEndian32(protoMessage.ByteSize());
protoMessage.SerializeToCodedStream(&codedOutput);

When writing you should also check that your buffer is large enough to fit the message (including the size). And when reading, you should check that your buffer contains a whole message (including the size).

It definitely would be handy if they added convenience methods to C++ API similar to those provided by the Java API.

Solution 4

IsteamInputStream is very fragile to eofs and other errors that easily occurs when used together with std::istream. After this the protobuf streams are permamently damaged and any already used buffer data is destroyed. There are proper support for reading from traditional streams in protobuf.

Implement google::protobuf::io::CopyingInputStream and use that together with CopyingInputStreamAdapter. Do the same for the output variants.

In practice a parsing call ends up in google::protobuf::io::CopyingInputStream::Read(void* buffer, int size) where a buffer is given. The only thing left to do is read into it somehow.

Here's an example for use with Asio synchronized streams (SyncReadStream/SyncWriteStream):

#include <google/protobuf/io/zero_copy_stream_impl_lite.h>

using namespace google::protobuf::io;


template <typename SyncReadStream>
class AsioInputStream : public CopyingInputStream {
    public:
        AsioInputStream(SyncReadStream& sock);
        int Read(void* buffer, int size);
    private:
        SyncReadStream& m_Socket;
};


template <typename SyncReadStream>
AsioInputStream<SyncReadStream>::AsioInputStream(SyncReadStream& sock) :
    m_Socket(sock) {}


template <typename SyncReadStream>
int
AsioInputStream<SyncReadStream>::Read(void* buffer, int size)
{
    std::size_t bytes_read;
    boost::system::error_code ec;
    bytes_read = m_Socket.read_some(boost::asio::buffer(buffer, size), ec);

    if(!ec) {
        return bytes_read;
    } else if (ec == boost::asio::error::eof) {
        return 0;
    } else {
        return -1;
    }
}


template <typename SyncWriteStream>
class AsioOutputStream : public CopyingOutputStream {
    public:
        AsioOutputStream(SyncWriteStream& sock);
        bool Write(const void* buffer, int size);
    private:
        SyncWriteStream& m_Socket;
};


template <typename SyncWriteStream>
AsioOutputStream<SyncWriteStream>::AsioOutputStream(SyncWriteStream& sock) :
    m_Socket(sock) {}


template <typename SyncWriteStream>
bool
AsioOutputStream<SyncWriteStream>::Write(const void* buffer, int size)
{   
    boost::system::error_code ec;
    m_Socket.write_some(boost::asio::buffer(buffer, size), ec);
    return !ec;
}

Usage:

AsioInputStream<boost::asio::ip::tcp::socket> ais(m_Socket); // Where m_Socket is a instance of boost::asio::ip::tcp::socket
CopyingInputStreamAdaptor cis_adp(&ais);
CodedInputStream cis(&cis_adp);

Message protoMessage;
uint32_t msg_size;

/* Read message size */
if(!cis.ReadVarint32(&msg_size)) {
    // Handle error
 }

/* Make sure not to read beyond limit of message */
CodedInputStream::Limit msg_limit = cis.PushLimit(msg_size);
if(!msg.ParseFromCodedStream(&cis)) {
    // Handle error
}

/* Remove limit */
cis.PopLimit(msg_limit);

Solution 5

I ran into the same issue in both C++ and Python.

For the C++ version, I used a mix of the code Kenton Varda posted on this thread and the code from the pull request he sent to the protobuf team (because the version posted here doesn't handle EOF while the one he sent to github does).

#include <google/protobuf/message_lite.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/io/coded_stream.h>


bool writeDelimitedTo(const google::protobuf::MessageLite& message,
    google::protobuf::io::ZeroCopyOutputStream* rawOutput)
{
    // We create a new coded stream for each message.  Don't worry, this is fast.
    google::protobuf::io::CodedOutputStream output(rawOutput);

    // Write the size.
    const int size = message.ByteSize();
    output.WriteVarint32(size);

    uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
    if (buffer != NULL)
    {
        // Optimization:  The message fits in one buffer, so use the faster
        // direct-to-array serialization path.
        message.SerializeWithCachedSizesToArray(buffer);
    }

    else
    {
        // Slightly-slower path when the message is multiple buffers.
        message.SerializeWithCachedSizes(&output);
        if (output.HadError())
            return false;
    }

    return true;
}

bool readDelimitedFrom(google::protobuf::io::ZeroCopyInputStream* rawInput, google::protobuf::MessageLite* message, bool* clean_eof)
{
    // We create a new coded stream for each message.  Don't worry, this is fast,
    // and it makes sure the 64MB total size limit is imposed per-message rather
    // than on the whole stream.  (See the CodedInputStream interface for more
    // info on this limit.)
    google::protobuf::io::CodedInputStream input(rawInput);
    const int start = input.CurrentPosition();
    if (clean_eof)
        *clean_eof = false;


    // Read the size.
    uint32_t size;
    if (!input.ReadVarint32(&size))
    {
        if (clean_eof)
            *clean_eof = input.CurrentPosition() == start;
        return false;
    }
    // Tell the stream not to read beyond that size.
    google::protobuf::io::CodedInputStream::Limit limit = input.PushLimit(size);

    // Parse the message.
    if (!message->MergeFromCodedStream(&input)) return false;
    if (!input.ConsumedEntireMessage()) return false;

    // Release the limit.
    input.PopLimit(limit);

    return true;
}

And here is my python2 implementation:

from google.protobuf.internal import encoder
from google.protobuf.internal import decoder

#I had to implement this because the tools in google.protobuf.internal.decoder
#read from a buffer, not from a file-like objcet
def readRawVarint32(stream):
    mask = 0x80 # (1 << 7)
    raw_varint32 = []
    while 1:
        b = stream.read(1)
        #eof
        if b == "":
            break
        raw_varint32.append(b)
        if not (ord(b) & mask):
            #we found a byte starting with a 0, which means it's the last byte of this varint
            break
    return raw_varint32

def writeDelimitedTo(message, stream):
    message_str = message.SerializeToString()
    delimiter = encoder._VarintBytes(len(message_str))
    stream.write(delimiter + message_str)

def readDelimitedFrom(MessageType, stream):
    raw_varint32 = readRawVarint32(stream)
    message = None

    if raw_varint32:
        size, _ = decoder._DecodeVarint32(raw_varint32, 0)

        data = stream.read(size)
        if len(data) < size:
            raise Exception("Unexpected end of file")

        message = MessageType()
        message.ParseFromString(data)

    return message

#In place version that takes an already built protobuf object
#In my tests, this is around 20% faster than the other version 
#of readDelimitedFrom()
def readDelimitedFrom_inplace(message, stream):
    raw_varint32 = readRawVarint32(stream)

    if raw_varint32:
        size, _ = decoder._DecodeVarint32(raw_varint32, 0)

        data = stream.read(size)
        if len(data) < size:
            raise Exception("Unexpected end of file")

        message.ParseFromString(data)

        return message
    else:
        return None

It might not be the best looking code and I'm sure it can be refactored a fair bit, but at least that should show you one way to do it.

Now the big problem: It's SLOW.

Even when using the C++ implementation of python-protobuf, it's one order of magnitude slower than in pure C++. I have a benchmark where I read 10M protobuf messages of ~30 bytes each from a file. It takes ~0.9s in C++, and 35s in python.

One way to make it a bit faster would be to re-implement the varint decoder to make it read from a file and decode in one go, instead of reading from a file and then decoding as this code currently does. (profiling shows that a significant amount of time is spent in the varint encoder/decoder). But needless to say that alone is not enough to close the gap between the python version and the C++ version.

Any idea to make it faster is very welcome :)

Share:
30,781
tzaman
Author by

tzaman

talha zaman (username at gmail) interests: programming languages, graphics, vision, image processing, software development hobbies: photography, puzzles, computer gaming, hiking, biking, chess, sailing, programming :)

Updated on July 05, 2022

Comments

  • tzaman
    tzaman almost 2 years

    I'm trying to read / write multiple Protocol Buffers messages from files, in both C++ and Java. Google suggests writing length prefixes before the messages, but there's no way to do that by default (that I could see).

    However, the Java API in version 2.1.0 received a set of "Delimited" I/O functions which apparently do that job:

    parseDelimitedFrom
    mergeDelimitedFrom
    writeDelimitedTo
    

    Are there C++ equivalents? And if not, what's the wire format for the size prefixes the Java API attaches, so I can parse those messages in C++?


    Update:

    These now exist in google/protobuf/util/delimited_message_util.h as of v3.3.0.

  • tzaman
    tzaman about 14 years
    Not the same thing; protocol buffers is a binary format, the "Delimited" functions actually just prepend a size. I'd need to know the format of the size prefix.
  • Yukiko
    Yukiko about 14 years
    Yes. This is the way I solved this problem. I added another answer with some sample pseudo code for writing a message.
  • tzaman
    tzaman about 14 years
    I'll be using an underlying OstreamOutputStream, so the length-checking won't be necessary, but thanks for the answer. :) In your case, I'd probably go with setting the bufLength to protoMessage.ByteSize() plus some extra for the size prefix.
  • tzaman
    tzaman about 10 years
    Hey, thanks for the answer Kenton! I'll switch accepted to this one over my own. Although I suspect the best answer at this point is to use Cap'n Proto instead? :)
  • tzaman
    tzaman about 10 years
    Also - why not get this merged in to the official protobuf lib on code.google?
  • Kenton Varda
    Kenton Varda about 10 years
    As an official patch it would need some more work, like unit tests, maybe a better error recovery story, etc., and between Cap'n Proto and Sandstorm.io I just don't have time. :/ If someone wants to claim this as their own and push it upstream feel free. The two functions should probably become methods on MessageLite. You should probably discuss with the current maintainers before doing any work as they might have their own plans.
  • mukunda
    mukunda over 9 years
    what about creating an OstreamOutputStream every time (wrapping a std::ostream arg), is that fast too?
  • Kenton Varda
    Kenton Varda over 9 years
    @mukunda - I think OstreamOutputStream is an instance of CopyingOutputStream which allocates a buffer, therefore it is slow to construct and destruct. Also, on destruction, any unread data currently in the buffer is simply discarded (not rewound), so you may be left at an unspecified offset in the stream.
  • Kenton Varda
    Kenton Varda about 9 years
    I just noticed that the second part of my previous comment is wrong. The concern about unread data in the buffer only applies to input streams (e.g. IstreamInputStream), not output streams. The first part of the comment still applies, though.
  • Jim Oldfield
    Jim Oldfield over 8 years
    A downside to using a varint header is that it's tricky to specify to an asynchronous API (like ASIO) to notify you when a whole header has been read. Using fixed sized ints is trivial: you just ask to wait until 4 bytes have been received (with asio::transfer_at_least). For varints you'd want to optimise for the common case that a whole header is read at once, while avoiding quadratic behaviour if someone sends an infinite stream of bytes with the high bit set. Also, to me, all this logic feels a bit too high level for socket reading code.
  • Jim Oldfield
    Jim Oldfield over 8 years
    Unrelated to my last comment: If someone did integrate this into the main protobuf project, it would be great if support were also included in protoc's --encode/decode switches (with multiple messages) for reading and writing binary protobuf files.
  • Kenton Varda
    Kenton Varda over 8 years
    Oh yeah, I should probably mention I submitted a pull request to protobuf to add these functions three months ago... hasn't been accepted yet, though: github.com/google/protobuf/pull/710
  • Moncef Mechri
    Moncef Mechri over 8 years
    @KentonVarda What would be the python equivalent of those functions?
  • Kenton Varda
    Kenton Varda over 8 years
    @fireboot Sorry, I didn't write the Python library so I'm not as familiar with it. I'd have to dig around for a while to figure it out, and unfortunately I don't have time. :/ I could probably verify code produced by someone else, though.
  • Moncef Mechri
    Moncef Mechri over 8 years
    @KentonVarda I will give it a go in the next days and post it here
  • Moncef Mechri
    Moncef Mechri over 8 years
    @KentonVarda I just posted my version in this thread. Comments welcome :)
  • abergmeier
    abergmeier over 8 years
    There is a general question why there are different implementations for encoding/decoding in Java/Python/C++. I do not understand why there is no base implementation in C++, which simply gets called in Java/Python...
  • tzaman
    tzaman about 8 years
    That would only be true if the size prefix included its own size, which it doesn't. If you do this you'll end up not reading the whole message.
  • vqqnda1
    vqqnda1 about 8 years
    It is precisely because the size prefix does not include its own size that this is a problem.
  • tzaman
    tzaman about 8 years
    The size prefix contains exactly the size of the message, which follows it. The code then proceeds to read that many bytes, which contain the entire message. Where's the problem?
  • vqqnda1
    vqqnda1 about 8 years
    Both the original code and the version I posted work fine and it turns out this wasn't my problem after all. My issue was CodedInputStream unexpectedly consuming all the data from the source buffer even though a limit had been set. I was trying to determine how much data was left over and CodedInputStream makes that very difficult. While in C#, this question helped me to figure it out: stackoverflow.com/questions/33733913/…
  • aggieNick02
    aggieNick02 almost 8 years
    This was a huge help. I had tried doing protobuf over sockets by using the asio istream/ostream interface and wrapping them in IStreamInputStream/OStreamOutputStream, and couldn't get it working. Thanks for posting this. With it and Kenton's functions, you can fairly easily build a client/server to talk protobuf in c++ using asio.
  • abergmeier
    abergmeier almost 8 years
    Your python code does not seem to work when using Python3. You would need to read bytes instead of strings for decoder to work.
  • Moncef Mechri
    Moncef Mechri almost 8 years
    Yes, this code was written for python 2, but it should be rather easy to adapt it and make it work for python 3. I've edited my post to indicate that this code targets python 2.
  • Stellan
    Stellan almost 7 years
    @kentonVarda, i'm looking for a way to write for example multiple users using a user message, then being able to get one specific user using the byte position of that user, is this the solution I need?
  • aggieNick02
    aggieNick02 about 6 years
    @JimOldfield - totally agreed on the varints. If you control both sides of the connection, doing a 4 byte length greatly simplifies things.
  • py_newbie
    py_newbie almost 6 years
    Can you please confirm that stream is of type StringIO in python
  • Alejandro Silvestri
    Alejandro Silvestri over 5 years
    @KentonVarda this code looks great! But I can't make it work. I'm passing a new ::google::protobuf::io::OstreamOutputStream(&file) as rawInput, with a message 9458 bytes long, it generates a file 8192 bytes long... Of course, deserialization fails, message is truncated. Some thoughts?
  • Kenton Varda
    Kenton Varda over 5 years
    @AlejandroSilvestri It sounds like some buffers haven't been flushed. I believe both OstreamOutputStream and std::ostream buffer data. They'll flush that data when you destroy the objects. I think they might also have explicit flush methods you can call, if you don't want to destroy the objects yet.
  • Alejandro Silvestri
    Alejandro Silvestri over 5 years
    Confirmed. As @KentonVarda said, I had to destroy OstreamOutputStream before closing the ofstream file.
  • Maskim
    Maskim about 5 years
    Works perfectely ! :) I used Parse/SerializePartialFromZeroCopyStream, it seemed like the writing was OK (large file), but I could parse only the last message. Is it a problem of file cursor ? I don't get the differences between Parse/Serialize.. from read/writeDelimited... except the two last write the size of the message
  • Nils
    Nils almost 5 years
    Had to read this twice bc I implied that delimited means you would write some delimiter after the message. Instead you just write this size first...
  • Lukas
    Lukas over 4 years
    FYI, the PR listed above has been merged and the functions in this answer are now available in delimited_message_util.h.
  • Niki
    Niki about 3 years
    Beware that in recent version of protobuf, google::protobuf::io::CodedInputStream input(rawInput); will immediately buffer/read lots of bytes, so don't rely on your IstreamInputStream stream object to know the actual offset of the current read message. That'is after reading first message, stream.tell() maybe at the end of the stream and not at the end of the first message in the stream!
  • Kenton Varda
    Kenton Varda about 3 years
    @Niki That has always been true. CodedInputStream asks the underlying ZeroCopyInputStream for a whole buffer of data immediately. CodedInputStream's destructor returns whatever portion of the buffer wasn't used. So if the CodedInputStream is still live, then the underlying ZeroCopyInputStream will be in an indeterminate state. That is how it worked when I wrote it in 2006...
  • Niki
    Niki about 3 years
    You are correct @KentonVarda . I confirmed that was the same in early version, it was a change of how we use it that caused the issue for us. Thank you for the correction!
  • dag
    dag almost 3 years
    This solution deserves to be much higher rated. Older answers don't reflect how the protobuf C++ lib has evolved to resolve what was obviously quite a shortcoming.