Spring boot Kafka class deserialization - not in the trusted package
Just use the Overloaded JsonDeserializer
constructor
Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean useHeadersIfPresent (which is true by default).
The following example shows how to do so:
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
Your Code :
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(Object.class,false));
}
And now use @KafkaListener
at class level
@KafkaListener(topics = "myTopic")
@Service
public class MultiListenerBean {
@KafkaHandler
public void listen(Cat cat) {
...
}
@KafkaHandler
public void listen(Hat hat) {
...
}
@KafkaHandler(isDefault = true)
public void delete(Object obj) {
...
}
}
Related videos on Youtube
Usr
Updated on September 15, 2022Comments
-
Usr over 1 year
I know this problem is very common, but after following different solutions I couldn't find any working one. I want to deserialize strings and also my custom class object when receiving the message in Kafka. With String is all good, but not with my Class. I've added trusted packages in consumer configurations (with
com.springmiddleware.entities
being the package where my class is):@Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.springmiddleware.entities"); return props; }
I have this in my
application.yml
file:spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: foo auto-offset-reset: earliest value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring: json: trusted: packages: 'com.springmiddleware.entities'
And added these lines to
application.properties
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=com.springmiddleware.entities spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.properties.spring.json.add.type.headers=false
But the following error keeps showing:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic2-0 at offset 1. If needed, please seek past the record to continue consumption. Caused by: java.lang.IllegalArgumentException: The class 'com.springmiddleware.entities.Crime' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
UPDATE
ReceiverConfig:
@EnableKafka @Configuration public class ReceiverConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.springmiddleware.entities"); props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false"); return props; } @Bean public ConsumerFactory<String, Object> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>()); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; }
UPDATE 2
Listener Class (Receiver): @KafkaListener(topics = "${app.topic.foo}") @Service public class Receiver { private CountDownLatch latch = new CountDownLatch(1); public CountDownLatch getLatch() { return latch; } @KafkaHandler public void listen(@Payload Crime message) { System.out.println("Received " + message); } @KafkaHandler public void listen(@Payload String message) { System.out.println("Received " + message); }
-
Usr about 5 yearsHi thank you, but I need to use the deserializer without the class specified because I deserialize different types of object - strings and myclass, in my case.
-
Usr about 5 yearsthank you, now it doesn't give me that problem, but I have a different error. It tells me Error while processing: ConsumerRecord - org.springframework.kafka.KafkaException: No method found for class java.util.LinkedHashMap
-
Deadpool about 5 yearsShow the listener class, are you doing batch? @TodorokiM
-
Usr about 5 yearsAdded listener class in the question
-
Deadpool about 5 yearsAdd the default kafkahandler method @TodorokiM and try that
-
Usr about 5 yearsThanks, this way is working. Only problem is, I can't cast the object I'm receiving to my class. It throws an error saying that it cannot be cast to myclass because is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader
-
Usr about 5 yearsOk, I've tried printing the class of the obj I'm receiving and it's a LinkedHasMap. Any idea why?
-
Deadpool about 5 yearsJust add one more
@KafkaHandler
method withLinkedHasMap
as argument and it's really blank to me what type of payload you have in that topic @TodorokiM, if you provide the appropriate POJO to payload it will work -
Usr about 5 yearsSolved, I've used an ObjectMapper to convert the LinkedHashMap to my class object. Thank you so much!
-
Saahon about 4 yearsHi, I have a similar problem too. Handled only
public void delete(Object obj)
method...public void listen(Cat cat)
metod is not working. How to fix it?