Kafka Serialization of an object
Solution 1
Hm, I haven't run into the same header issue that you are encountering but my project wasn't compiling correctly when I didn't provide a VerifiableProperties
constructor in my encoder/decoder. It seems strange that the missing constructor would corrupt Jackson's deserialization though.
Perhaps try splitting up your encoder and decoder and include the VerifiableProperties
constructor in both; you shouldn't need to implement Decoder[T]
for serialization. I was able to successfully implement json de/serialization using ObjectMapper
following the format in this post.
Good luck!
Solution 2
Bytebuffers .array() method is not very reliable. It depends on the particular implementation. You might want to try
ByteBuffer bb = message.payload()
byte[] b = new byte[bb.remaining()]
bb.get(b, 0, b.length);
return mapper.readValue(b, EventDetails.class)
krakover
12 Years in the industry. Currently employed as the head of server at a mobile advertising startup.
Updated on June 26, 2022Comments
-
krakover almost 2 years
I started playing with Kafka. I've set an a zookeeper configuration, and I managed to send and consume String messages. Now I am trying to pass an Object (in java), but from some reason, when parsing the Message in the consumer I have header issues. I tried several serialization options (using Decoder/Encoder), and all of the return the same header issue.
Here is my code The producer:
Properties props = new Properties(); props.put("zk.connect", "localhost:2181"); props.put("serializer.class", "com.inneractive.reporter.kafka.EventsDataSerializer"); ProducerConfig config = new ProducerConfig(props); Producer<Long, EventDetails> producer = new Producer<Long, EventDetails>(config); ProducerData<Long, EventDetails> data = new ProducerData<Long, EventDetails>("test3", 1, Arrays.asList(new EventDetails()); try { producer.send(data); } finally { producer.close(); }
And the consumer:
Properties props = new Properties(); props.put("zk.connect", "localhost:2181"); props.put("zk.connectiontimeout.ms", "1000000"); props.put("groupid", "test_group"); // Create the connection to the cluster ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume Map<String, List<KafkaMessageStream<EventDetails>>> topicMessageStreams = consumerConnector.createMessageStreams(ImmutableMap.of("test3", 4), new EventsDataSerializer()); List<KafkaMessageStream<EventDetails>> streams = topicMessageStreams.get("test3"); // create list of 4 threads to consume from each of the partitions ExecutorService executor = Executors.newFixedThreadPool(4); // consume the messages in the threads for (final KafkaMessageStream<EventDetails> stream: streams) { executor.submit(new Runnable() { public void run() { for(EventDetails event: stream) { System.err.println("********** Got message" + event.toString()); } } }); }
and my Serializer:
public class EventsDataSerializer implements Encoder<EventDetails>, Decoder<EventDetails> { public Message toMessage(EventDetails eventDetails) { try { ObjectMapper mapper = new ObjectMapper(new SmileFactory()); byte[] serialized = mapper.writeValueAsBytes(eventDetails); return new Message(serialized); } catch (IOException e) { e.printStackTrace(); return null; // TODO } } public EventDetails toEvent(Message message) { EventDetails event = new EventDetails(); ObjectMapper mapper = new ObjectMapper(new SmileFactory()); try { //TODO handle error return mapper.readValue(message.payload().array(), EventDetails.class); } catch (IOException e) { e.printStackTrace(); return null; } } }
And this is the error I get:
org.codehaus.jackson.JsonParseException: Input does not start with Smile format header (first byte = 0x0) and parser has REQUIRE_HEADER enabled: can not parse at [Source: N/A; line: -1, column: -1]
When I worked with
MessagePack
and with plain writing to aObjectOutputStream
I got a similiar header issue. I also tried to add the payload CRC32 to the message, but that didn't help as well.What am I doing wrong here?