Kafka Serialization of an object

14,419

Solution 1

Hm, I haven't run into the same header issue that you are encountering but my project wasn't compiling correctly when I didn't provide a VerifiableProperties constructor in my encoder/decoder. It seems strange that the missing constructor would corrupt Jackson's deserialization though.

Perhaps try splitting up your encoder and decoder and include the VerifiableProperties constructor in both; you shouldn't need to implement Decoder[T] for serialization. I was able to successfully implement json de/serialization using ObjectMapper following the format in this post.

Good luck!

Solution 2

Bytebuffers .array() method is not very reliable. It depends on the particular implementation. You might want to try

ByteBuffer bb = message.payload()

byte[] b = new byte[bb.remaining()]
bb.get(b, 0, b.length);
return mapper.readValue(b, EventDetails.class) 
Share:
14,419
krakover
Author by

krakover

12 Years in the industry. Currently employed as the head of server at a mobile advertising startup.

Updated on June 26, 2022

Comments

  • krakover
    krakover almost 2 years

    I started playing with Kafka. I've set an a zookeeper configuration, and I managed to send and consume String messages. Now I am trying to pass an Object (in java), but from some reason, when parsing the Message in the consumer I have header issues. I tried several serialization options (using Decoder/Encoder), and all of the return the same header issue.

    Here is my code The producer:

            Properties props = new Properties();
            props.put("zk.connect", "localhost:2181");
            props.put("serializer.class", "com.inneractive.reporter.kafka.EventsDataSerializer");
            ProducerConfig config = new ProducerConfig(props);
            Producer<Long, EventDetails> producer = new Producer<Long, EventDetails>(config);
            ProducerData<Long, EventDetails> data = new ProducerData<Long, EventDetails>("test3", 1, Arrays.asList(new EventDetails());
            try {
               producer.send(data);
            } finally {
               producer.close();
            }
    

    And the consumer:

            Properties props = new Properties();
            props.put("zk.connect", "localhost:2181");
            props.put("zk.connectiontimeout.ms", "1000000");
            props.put("groupid", "test_group");
    
            // Create the connection to the cluster
            ConsumerConfig consumerConfig = new ConsumerConfig(props);
            ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    
            // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
            Map<String, List<KafkaMessageStream<EventDetails>>> topicMessageStreams =
                    consumerConnector.createMessageStreams(ImmutableMap.of("test3", 4), new EventsDataSerializer());
            List<KafkaMessageStream<EventDetails>> streams = topicMessageStreams.get("test3");
    
            // create list of 4 threads to consume from each of the partitions
            ExecutorService executor = Executors.newFixedThreadPool(4);
    
            // consume the messages in the threads
            for (final KafkaMessageStream<EventDetails> stream: streams) {
                executor.submit(new Runnable() {
                    public void run() {
                        for(EventDetails event: stream) {
                            System.err.println("********** Got message" + event.toString());        
                        }
                    }
                });
            }
    

    and my Serializer:

    public  class EventsDataSerializer implements Encoder<EventDetails>, Decoder<EventDetails> {
        public Message toMessage(EventDetails eventDetails) {
            try {
                ObjectMapper mapper = new ObjectMapper(new SmileFactory());
                byte[] serialized = mapper.writeValueAsBytes(eventDetails);
                return new Message(serialized);
    } catch (IOException e) {
                e.printStackTrace();
                return null;   // TODO
            }
    }
        public EventDetails toEvent(Message message) {
            EventDetails event = new EventDetails();
    
            ObjectMapper mapper = new ObjectMapper(new SmileFactory());
            try {
                //TODO handle error
                return mapper.readValue(message.payload().array(), EventDetails.class);
            } catch (IOException e) {
                e.printStackTrace();
                return null;
            }
    
        }
    }
    

    And this is the error I get:

    org.codehaus.jackson.JsonParseException: Input does not start with Smile format header (first byte = 0x0) and parser has REQUIRE_HEADER enabled: can not parse
     at [Source: N/A; line: -1, column: -1]
    

    When I worked with MessagePack and with plain writing to a ObjectOutputStream I got a similiar header issue. I also tried to add the payload CRC32 to the message, but that didn't help as well.

    What am I doing wrong here?