Spring Boot / Kafka Json Deserialization - Trusted Packages

12,098

Solution 1

Ok, I have read the documentation in a bit more detail & have found an answer to my question. I am using Kotlin so the creation of my consumer looks like this with the

@Bean
fun consumerFactory(): ConsumerFactory<String, FeedItem> {
    val configProps = HashMap<String, Any>()
    configProps[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
    configProps[ConsumerConfig.GROUP_ID_CONFIG] = "feedme"
    configProps[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    configProps[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java
    configProps[JsonDeserializer.TRUSTED_PACKAGES] = "co.orders.feedme.feed.domain"
    return DefaultKafkaConsumerFactory(configProps)
}

Now I just need a way to override the creation of the Jackson ObjectMapper in the JsonDeserializer so that it can work with my Kotlin data classes that don't have a zero-argument constructor :)

Solution 2

Since you have the trusted package issue solved, for your next problem you could take advantage of the overloaded

DefaultKafkaConsumerFactory(Map<String, Object> configs,
            Deserializer<K> keyDeserializer,
            Deserializer<V> valueDeserializer)

 and the JsonDeserializer "wrapper" of spring kafka

JsonDeserializer(Class<T> targetType, ObjectMapper objectMapper)

Combining the above, for Java I have:

new DefaultKafkaConsumerFactory<>(properties,
                new IntegerDeserializer(),
                new JsonDeserializer<>(Foo.class,
                        new ObjectMapper()
                .registerModules(new KotlinModule(), new JavaTimeModule()).setSerializationInclusion(JsonInclude.Include.NON_NULL)
                .setDateFormat(new ISO8601DateFormat()).configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false))));

Essentially, you can tell the factory to use your own Deserializers and for the Json one, provide your own ObjectMapper. There you can register the Kotlin Module as well as customize date formats and other stuff.

Share:
12,098
hexkid
Author by

hexkid

Greying developer with a particular interest in Kotlin

Updated on June 04, 2022

Comments

  • hexkid
    hexkid almost 2 years

    I am just starting to use Kafka with Spring Boot & want to send & consume JSON objects.

    I am getting the following error when I attempt to consume an message from the Kafka topic:

    org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition dev.orders-0 at offset 9903. If needed, please seek past the record to continue consumption.
    Caused by: java.lang.IllegalArgumentException: The class 'co.orders.feedme.feed.domain.OrderItem' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139) ~[spring-kafka-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113) ~[spring-kafka-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:218) ~[spring-kafka-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93) ~[kafka-clients-1.0.1.jar:na]
    

    I have attempted to add my package to the list of trusted packages by defining the following property in application.properties:

    spring.kafka.consumer.properties.spring.json.trusted.packages = co.orders.feedme.feed.domain
    

    This doesn't appear to make any differences. What is the correct way to add my package to the list of trusted packages for Spring's Kafka JsonDeserializer?

  • Tsolak Barseghyan
    Tsolak Barseghyan over 4 years
    For java similar kind: configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "co.orders.feedme.feed.domain");