Spring boot Kafka class deserialization - not in the trusted package

11,020

Just use the Overloaded JsonDeserializer constructor

Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean useHeadersIfPresent (which is true by default).

The following example shows how to do so:

DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
    new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

Your Code :

@Bean
public ConsumerFactory<String, Object> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(),  new StringDeserializer(),
            new JsonDeserializer<>(Object.class,false));
}

And now use @KafkaListener at class level

@KafkaListener(topics = "myTopic")
@Service
public class MultiListenerBean {

@KafkaHandler
public void listen(Cat cat) {
    ...
}

@KafkaHandler
public void listen(Hat hat) {
    ...
}

@KafkaHandler(isDefault = true)
public void delete(Object obj) {
    ...
   }

}
Share:
11,020

Related videos on Youtube

Usr
Author by

Usr

Updated on September 15, 2022

Comments

  • Usr
    Usr over 1 year

    I know this problem is very common, but after following different solutions I couldn't find any working one. I want to deserialize strings and also my custom class object when receiving the message in Kafka. With String is all good, but not with my Class. I've added trusted packages in consumer configurations (with com.springmiddleware.entities being the package where my class is):

    @Bean
        public Map<String, Object> consumerConfigs() {
    
    
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.springmiddleware.entities");
    
            return props;
        }
    

    I have this in my application.yml file:

    spring:
    kafka:
        bootstrap-servers: localhost:9092
        consumer:
          group-id: foo
          auto-offset-reset: earliest
          value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
          properties:
            spring:
              json:
                trusted:
                  packages: 'com.springmiddleware.entities'
    

    And added these lines to application.properties

    spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
    spring.kafka.consumer.properties.spring.json.trusted.packages=com.springmiddleware.entities
    spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer 
    spring.kafka.producer.properties.spring.json.add.type.headers=false
    

    But the following error keeps showing:

    org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic2-0 at offset 1. If needed, please seek past the record to continue consumption. Caused by: java.lang.IllegalArgumentException: The class 'com.springmiddleware.entities.Crime' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

    UPDATE

    ReceiverConfig:

    @EnableKafka
    @Configuration
    public class ReceiverConfig {
    
        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
    
        @Bean
        public Map<String, Object> consumerConfigs() {
    
    
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.springmiddleware.entities");
            props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false");
            return props;
        }
    
        @Bean
        public ConsumerFactory<String, Object> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs(),  new StringDeserializer(),
                    new JsonDeserializer<>());
        }
    
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    

    UPDATE 2

    Listener Class (Receiver): 
    
        @KafkaListener(topics = "${app.topic.foo}")
    @Service
    public class Receiver {
    
    
         private CountDownLatch latch = new CountDownLatch(1);
    
            public CountDownLatch getLatch() {
                return latch;
            }
    
       @KafkaHandler
        public void listen(@Payload Crime message) {
    
                System.out.println("Received " + message);
        }
    
       @KafkaHandler
        public void listen(@Payload String message) {
    
            System.out.println("Received " +  message);
    }
    
  • Usr
    Usr about 5 years
    Hi thank you, but I need to use the deserializer without the class specified because I deserialize different types of object - strings and myclass, in my case.
  • Usr
    Usr about 5 years
    thank you, now it doesn't give me that problem, but I have a different error. It tells me Error while processing: ConsumerRecord - org.springframework.kafka.KafkaException: No method found for class java.util.LinkedHashMap
  • Deadpool
    Deadpool about 5 years
    Show the listener class, are you doing batch? @TodorokiM
  • Usr
    Usr about 5 years
    Added listener class in the question
  • Deadpool
    Deadpool about 5 years
    Add the default kafkahandler method @TodorokiM and try that
  • Usr
    Usr about 5 years
    Thanks, this way is working. Only problem is, I can't cast the object I'm receiving to my class. It throws an error saying that it cannot be cast to myclass because is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.Restar‌​tClassLoader
  • Usr
    Usr about 5 years
    Ok, I've tried printing the class of the obj I'm receiving and it's a LinkedHasMap. Any idea why?
  • Deadpool
    Deadpool about 5 years
    Just add one more @KafkaHandler method with LinkedHasMap as argument and it's really blank to me what type of payload you have in that topic @TodorokiM, if you provide the appropriate POJO to payload it will work
  • Usr
    Usr about 5 years
    Solved, I've used an ObjectMapper to convert the LinkedHashMap to my class object. Thank you so much!
  • Saahon
    Saahon about 4 years
    Hi, I have a similar problem too. Handled only public void delete(Object obj) method... public void listen(Cat cat) metod is not working. How to fix it?