Kafka Streams - SerializationException: Unknown magic byte

21,969

Unknown magic byte!

Means your data does not adhere to the wire format that's expected for the Schema Registry.

Or, in other words, the data you're trying to read, is not Avro, as expected by the Confluent Avro deserializer.

You can expect the same error by running kafka-avro-console-consumer, by the way, so you may want to debug using that too

If you are sure your data is indeed Avro, and the schema is actually sent as part of the message (would need to see your producer code), then you shouldn't use the Confluent Avro deserializers that are expecting a specific byte format in the message. Instead, you could use ByteArrayDesrializer and read the Avro record yourself, then pass it to the Apache Avro BinaryDecoder class. As a bonus, you can extract that logic into your own Deserialzer class

Also, if the input topic is Avro, I don't think you should be using this property for reading strings

DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
Share:
21,969

Related videos on Youtube

R. B
Author by

R. B

Updated on January 21, 2020

Comments

  • R. B
    R. B over 4 years

    I am trying to create a Kafka Streams Application which processes Avro records, but I am getting the following error:

    Exception in thread "streams-application-c8031218-8de9-4d55-a5d0-81c30051a829-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:567)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:900)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:801)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)
    Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
    Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
    

    I am not sure what is causing this error. I am just trying to get Avro records into the application first where they then will be processed and then output to another topic but it doesn't not seem to be working. I have included the code from the application below. Can anyone see why it is not working?

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
        Serde<String> stringSerde = Serdes.String();
        Serde<trackingReport> specificAvroTrackingReportSerde = new SpecificAvroSerde<trackingReport>();
    
        specificAvroTrackingReportSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), false);
    
    
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, trackingReport> inputreports = builder.stream("intesttopic", Consumed.with(stringSerde, specificAvroTrackingReportSerde));
    
    
        KStream<String, trackingReport> outputreports = inputreports;
    
        String outputTopic = "outtesttopic";
        outputreports.to(outputTopic, Produced.with(stringSerde, specificAvroTrackingReportSerde));
    
        Topology topology = builder.build();
    
        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();
    
  • R. B
    R. B over 5 years
    Yeah I just checked running the command and it did not work either. My producer is the same as the one at: stackoverflow.com/questions/53781639/…
  • R. B
    R. B over 5 years
    Yeah I understand that property but thought it was okay to override as I have done with Consumed.with
  • OneCricketeer
    OneCricketeer over 5 years
    intesttopic is not the same topic as the one being sent to in the previous post
  • OneCricketeer
    OneCricketeer over 5 years
    By the way, outputreports is an unnecessary variable. There's no need to copy the KStream variable to a new name
  • R. B
    R. B over 5 years
    If I am not using the Confluent Avro deserializer should I be creating a custom one
  • OneCricketeer
    OneCricketeer over 5 years
    Your deserializer needs to invert whatever serializer you used in the producer. In Kafka Streams, you have a Serde class that combines the two... I'm not sure if that answers your question
  • Kevin Hooke
    Kevin Hooke over 4 years
    Re "Or if it is, and the schema is sent as part of the message (would need to see your producer code), then you shouldn't use the Confluent Avro deserializers" - what Deserializer should you use in this case? We're trying to read Avro messages from Oracle Golden Gate and getting the same error when attempting to deserialize with KafkaAvroDeserializer
  • OneCricketeer
    OneCricketeer over 4 years
    @Kevin What serializers / converters did you configure GoldenGate with? If you used the Confluent ones, then those work with KafkaAvroDeserializer. If you used JSON converters, then you wouldn't use Avro. Also make sure you check both the key and value of the converter settings
  • Kevin Hooke
    Kevin Hooke over 4 years
    @cricket_007 KafkaAvroSerializer/KafkaAvroDeserializer with messages sent with GG Kafka Handler get the Unknown Magic Byte error, but the same KafkaAvroSerializer/KafkaAvroDeserializer for messages sent by GG using Kafka Connect Handler work as expected. Not sure if we had Kafka Handler misconfigured, but Kafka Connect Handler is working for us
  • OneCricketeer
    OneCricketeer over 4 years
    @Kevin , I'm actually not familiar with GoldenGate, but happy to look into it, if you create a full post wiith the issue rather than just a comment. The "Kafka Connect Handler" would use AvroConverter, which wraps those seializers you mentioned. Confluent only offers one combo of those classes, so I'm not sure what package names of those classes you're refering to