Kafka Streams - SerializationException: Unknown magic byte
Unknown magic byte!
Means your data does not adhere to the wire format that's expected for the Schema Registry.
Or, in other words, the data you're trying to read, is not Avro, as expected by the Confluent Avro deserializer.
You can expect the same error by running kafka-avro-console-consumer
, by the way, so you may want to debug using that too
If you are sure your data is indeed Avro, and the schema is actually sent as part of the message (would need to see your producer code), then you shouldn't use the Confluent Avro deserializers that are expecting a specific byte format in the message. Instead, you could use ByteArrayDesrializer
and read the Avro record yourself, then pass it to the Apache Avro BinaryDecoder class
. As a bonus, you can extract that logic into your own Deserialzer class
Also, if the input topic is Avro, I don't think you should be using this property for reading strings
DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
Related videos on Youtube
R. B
Updated on January 21, 2020Comments
-
R. B over 4 years
I am trying to create a Kafka Streams Application which processes Avro records, but I am getting the following error:
Exception in thread "streams-application-c8031218-8de9-4d55-a5d0-81c30051a829-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately. at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74) at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91) at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:567) at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:900) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:801) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719) Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
I am not sure what is causing this error. I am just trying to get Avro records into the application first where they then will be processed and then output to another topic but it doesn't not seem to be working. I have included the code from the application below. Can anyone see why it is not working?
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-application"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); Serde<String> stringSerde = Serdes.String(); Serde<trackingReport> specificAvroTrackingReportSerde = new SpecificAvroSerde<trackingReport>(); specificAvroTrackingReportSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), false); StreamsBuilder builder = new StreamsBuilder(); KStream<String, trackingReport> inputreports = builder.stream("intesttopic", Consumed.with(stringSerde, specificAvroTrackingReportSerde)); KStream<String, trackingReport> outputreports = inputreports; String outputTopic = "outtesttopic"; outputreports.to(outputTopic, Produced.with(stringSerde, specificAvroTrackingReportSerde)); Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, props); streams.start();
-
R. B over 5 yearsYeah I just checked running the command and it did not work either. My producer is the same as the one at: stackoverflow.com/questions/53781639/…
-
R. B over 5 yearsYeah I understand that property but thought it was okay to override as I have done with Consumed.with
-
OneCricketeer over 5 years
intesttopic
is not the same topic as the one being sent to in the previous post -
OneCricketeer over 5 yearsBy the way,
outputreports
is an unnecessary variable. There's no need to copy the KStream variable to a new name -
R. B over 5 yearsIf I am not using the Confluent Avro deserializer should I be creating a custom one
-
OneCricketeer over 5 yearsYour deserializer needs to invert whatever serializer you used in the producer. In Kafka Streams, you have a Serde class that combines the two... I'm not sure if that answers your question
-
Kevin Hooke over 4 yearsRe "Or if it is, and the schema is sent as part of the message (would need to see your producer code), then you shouldn't use the Confluent Avro deserializers" - what Deserializer should you use in this case? We're trying to read Avro messages from Oracle Golden Gate and getting the same error when attempting to deserialize with KafkaAvroDeserializer
-
OneCricketeer over 4 years@Kevin What serializers / converters did you configure GoldenGate with? If you used the Confluent ones, then those work with KafkaAvroDeserializer. If you used JSON converters, then you wouldn't use Avro. Also make sure you check both the key and value of the converter settings
-
Kevin Hooke over 4 years@cricket_007 KafkaAvroSerializer/KafkaAvroDeserializer with messages sent with GG Kafka Handler get the Unknown Magic Byte error, but the same KafkaAvroSerializer/KafkaAvroDeserializer for messages sent by GG using Kafka Connect Handler work as expected. Not sure if we had Kafka Handler misconfigured, but Kafka Connect Handler is working for us
-
OneCricketeer over 4 years@Kevin , I'm actually not familiar with GoldenGate, but happy to look into it, if you create a full post wiith the issue rather than just a comment. The "Kafka Connect Handler" would use AvroConverter, which wraps those seializers you mentioned. Confluent only offers one combo of those classes, so I'm not sure what package names of those classes you're refering to