How to encode/decode Kafka messages using Avro binary encoder?

55,509

Solution 1

I finally remembered to ask the Kafka mailing list and got the following as an answer, which worked perfectly.

Yes, you can send messages as byte arrays. If you look at the constructor of the Message class, you will see -

def this(bytes: Array[Byte])

Now, looking at the Producer send() API -

def send(producerData: ProducerData[K,V]*)

You can set V to be of type Message and K to what you want your key to be. If you don't care about partitioning using a key, then set that to Message type as well.

Thanks, Neha

Solution 2

This is a basic example. I have not tried it with multiple partitions/topics.

//Sample producer code

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.*;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Properties;


public class ProducerTest {

    void producer(Schema schema) throws IOException {

        Properties props = new Properties();
        props.put("metadata.broker.list", "0:9092");
        props.put("serializer.class", "kafka.serializer.DefaultEncoder");
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, byte[]> producer = new Producer<String, byte[]>(config);
        GenericRecord payload1 = new GenericData.Record(schema);
        //Step2 : Put data in that genericrecord object
        payload1.put("desc", "'testdata'");
        //payload1.put("name", "अasa");
        payload1.put("name", "dbevent1");
        payload1.put("id", 111);
        System.out.println("Original Message : "+ payload1);
        //Step3 : Serialize the object to a bytearray
        DatumWriter<GenericRecord>writer = new SpecificDatumWriter<GenericRecord>(schema);
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
        writer.write(payload1, encoder);
        encoder.flush();
        out.close();

        byte[] serializedBytes = out.toByteArray();
        System.out.println("Sending message in bytes : " + serializedBytes);
        //String serializedHex = Hex.encodeHexString(serializedBytes);
        //System.out.println("Serialized Hex String : " + serializedHex);
        KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>("page_views", serializedBytes);
        producer.send(message);
        producer.close();

    }


    public static void main(String[] args) throws IOException, DecoderException {
        ProducerTest test = new ProducerTest();
        Schema schema = new Schema.Parser().parse(new File("src/test_schema.avsc"));
        test.producer(schema);
    }
}

//Sample consumer code

Part 1 : Consumer group code : as you can have more than multiple consumers for multiple partitions/ topics.

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * Created by  on 9/1/15.
 */
public class ConsumerGroupExample {
   private final ConsumerConnector consumer;
   private final String topic;
   private ExecutorService executor;

   public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic){
      consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
              createConsumerConfig(a_zookeeper, a_groupId));
      this.topic = a_topic;
   }

   private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId){
       Properties props = new Properties();
       props.put("zookeeper.connect", a_zookeeper);
       props.put("group.id", a_groupId);
       props.put("zookeeper.session.timeout.ms", "400");
       props.put("zookeeper.sync.time.ms", "200");
       props.put("auto.commit.interval.ms", "1000");

       return new ConsumerConfig(props);
   }

    public void shutdown(){
         if (consumer!=null) consumer.shutdown();
        if (executor!=null) executor.shutdown();
        System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
        try{
          if(!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)){

          }
        }catch(InterruptedException e){
            System.out.println("Interrupted");
        }

    }


    public void run(int a_numThreads){
        //Make a map of topic as key and no. of threads for that topic
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        //Create message streams for each topic
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        //initialize thread pool
        executor = Executors.newFixedThreadPool(a_numThreads);
        //start consuming from thread
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }
    public static void main(String[] args) {
        String zooKeeper = args[0];
        String groupId = args[1];
        String topic = args[2];
        int threads = Integer.parseInt(args[3]);

        ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
        example.run(threads);

        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {

        }
        example.shutdown();
    }


}

Part 2 : Indiviual consumer that actually consumes the messages.

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.codec.binary.Hex;

import java.io.File;
import java.io.IOException;

public class ConsumerTest implements Runnable{

    private KafkaStream m_stream;
    private int m_threadNumber;

    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run(){
        ConsumerIterator<byte[], byte[]>it = m_stream.iterator();
        while(it.hasNext())
        {
            try {
                //System.out.println("Encoded Message received : " + message_received);
                //byte[] input = Hex.decodeHex(it.next().message().toString().toCharArray());
                //System.out.println("Deserializied Byte array : " + input);
                byte[] received_message = it.next().message();
                System.out.println(received_message);
                Schema schema = null;
                schema = new Schema.Parser().parse(new File("src/test_schema.avsc"));
                DatumReader<GenericRecord> reader = new SpecificDatumReader<GenericRecord>(schema);
                Decoder decoder = DecoderFactory.get().binaryDecoder(received_message, null);
                GenericRecord payload2 = null;
                payload2 = reader.read(null, decoder);
                System.out.println("Message received : " + payload2);
            }catch (Exception e) {
                e.printStackTrace();
                System.out.println(e);
            }
        }

    }


}

Test AVRO schema :

{
    "namespace": "xyz.test",
     "type": "record",
     "name": "payload",
     "fields":[
         {
            "name": "name", "type": "string"
         },
         {
            "name": "id",  "type": ["int", "null"]
         },
         {
            "name": "desc", "type": ["string", "null"]
         }
     ]
}

Important things to note are :

  1. Youll need the standard kafka and avro jars to run this code out of the box.

  2. Is very important props.put("serializer.class", "kafka.serializer.DefaultEncoder"); Dont use stringEncoder as that wont work if you are sending a byte array as message.

  3. You can convert the byte[] to a hex string and send that and on the consumer reconvert hex string to byte[] and then to the original message.

  4. Run the zookeeper and the broker as mentioned here :- http://kafka.apache.org/documentation.html#quickstart and create a topic called "page_views" or whatever you want.

  5. Run the ProducerTest.java and then the ConsumerGroupExample.java and see the avro data being produced and consumed.

Solution 3

If you want to get a byte array from an Avro message (the kafka part is already answered), use the binary encoder:

    GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); 
    ByteArrayOutputStream os = new ByteArrayOutputStream(); 
    try {
        Encoder e = EncoderFactory.get().binaryEncoder(os, null); 
        writer.write(record, e); 
        e.flush(); 
        byte[] byteData = os.toByteArray(); 
    } finally {
        os.close(); 
    }

Solution 4

Updated Answer.

Kafka has an Avro serializer/deserializer with Maven (SBT formatted) coordinates:

  "io.confluent" % "kafka-avro-serializer" % "3.0.0"

You pass an instance of KafkaAvroSerializer into the KafkaProducer constructor.

Then you can create Avro GenericRecord instances, and use those as values inside Kafka ProducerRecord instances which you can send with KafkaProducer.

On the Kafka consumer side, you use KafkaAvroDeserializer and KafkaConsumer.

Solution 5

Instead of Avro, you could also simply consider compressing data; either with gzip (good compression, higher cpu) or LZF or Snappy (much faster, bit slower compression).

Or alternatively there is also Smile binary JSON, supported in Java by Jackson (with this extension): it is compact binary format, and much easier to use than Avro:

ObjectMapper mapper = new ObjectMapper(new SmileFactory());
byte[] serialized = mapper.writeValueAsBytes(pojo);
// or back
SomeType pojo = mapper.readValue(serialized, SomeType.class);

basically same code as with JSON, except for passing different format factory. From data size perspective, whether Smile or Avro is more compact depends on details of use case; but both are more compact than JSON.

Benefit there is that this works fast with both JSON and Smile, with same code, using just POJOs. Compared to Avro which either requires code generation, or lots of manual code to pack and unpack GenericRecords.

Share:
55,509
blockcipher
Author by

blockcipher

Updated on August 14, 2021

Comments

  • blockcipher
    blockcipher almost 3 years

    I'm trying to use Avro for messages being read from/written to Kafka. Does anyone have an example of using the Avro binary encoder to encode/decode data that will be put on a message queue?

    I need the Avro part more than the Kafka part. Or, perhaps I should look at a different solution? Basically, I'm trying to find a more efficient solution to JSON with regards to space. Avro was just mentioned since it can be more compact than JSON.