Send a json payload to Apache Kafka topic in Spring

11,088

Able to reproduce your error at my local setup.

Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.example.demo.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: com.example.demo.User cannot be cast to java.lang.String

If we look at log then it says value.serializer value still referring to default StringSerializer rather than expected is JsonSerializer which in turn says your producer configuration are not getting into effect. In short your KafkaCOnfiguration class is not being referred.

Your KafkaConfiguration class is in some config package and User class in some com.xyz package. So solution would be to make sure that it gets picks up your configuration. Most probably that package may not getting scanned for configuration/beans definition. If you move KafkaConfiguration to root package of your application then your original code should work fine.

If you say that your KafkaTemplate object getting injected then it's actually not. The one which is getting injected is defined by Spring kafka Autoconfiguration.

Share:
11,088
Prashant Raghav
Author by

Prashant Raghav

Updated on June 04, 2022

Comments

  • Prashant Raghav
    Prashant Raghav almost 2 years

    I need to send (Post) Json payload to Apache Kafka topic but I am receiving the following error :- "message": "Can't convert value of class com.xyz.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer"

    Also Spring shows class cast exception :- java.lang.ClassCastException: com.xyz.User cannot be cast to java.lang.String

    Following is my modal,kafka config and controller

    public class User {
        private String firstname;
        private String email;
    
    
        public User() {}
        public User(String firstname, String email) {
            super();
            this.firstname = firstname;
            this.email = email;
        }
        public String getFirstname() {
            return firstname;
        }
        public void setFirstname(String firstname) {
            this.firstname = firstname;
        }
        public String getEmail() {
            return email;
        }
        public void setEmail(String email) {
            this.email = email;
        }
        @Override
        public String toString() {
            return "UserModel [firstname=" + firstname + ", email=" + email + "]";
        }
    
    
    }
    

    Kafka Configuration

     package config;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    import org.springframework.kafka.support.serializer.JsonSerializer;
    
    import com.xyz.User;
    
    @Configuration
    public class KafkaConfiguration {
    
        @Bean
        public ProducerFactory<String, User> producerFactory(){
            Map<String, Object> config = new HashMap<>();
    
            config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
            config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    
            return new DefaultKafkaProducerFactory<>(config);
    
        }
         @Bean
            public KafkaTemplate<String, User> kafkaTemplate() {
                return new KafkaTemplate<>(producerFactory());
            }
    
    }
    

    Controller

    @RestController
    @RequestMapping("/kafka")
    public class UserController {
        @Autowired
        private KafkaTemplate<String, User> kafkaTemplate;
    
        private static String TOPIC = "kafka-producer";
        @PostMapping("/publish")
        public void getUserId(@RequestBody User user) {
    
            kafkaTemplate.send(TOPIC, user);
    

    Json Payload send from postman

    {
        "firstname" : "xyz",
        "email" : "[email protected]"
    
    }
    
    • aurelius
      aurelius over 5 years
      the producer is configured in xml? can you post that one as well?
    • Prashant Raghav
      Prashant Raghav over 5 years
      @aurelius , I am using spring-boot , i don't have any xml configurations for producer.
    • aurelius
      aurelius over 5 years
      then you need the KafkaTemplate, which is used to send the messages
    • Prashant Raghav
      Prashant Raghav over 5 years
      yes i am trying to send via kafkatemplate kafkaTemplate.send(TOPIC, user);
    • aurelius
      aurelius over 5 years
      can you post that code as well, I think you did not change the type of the message that you send, from string to User
    • Deadpool
      Deadpool over 5 years
      can you post the import statements of config class
    • Prashant Raghav
      Prashant Raghav over 5 years
      added import statements as suggested by Deadpool
    • Nishu Tayal
      Nishu Tayal over 5 years
      wondering if you have a content-type set to json type in your request headers?
    • Paizo
      Paizo over 5 years
      If you are want to send only USER than you can create your own converter and specify it in the kafka configuration. if you use String serializer/deserializer or json well, you have to pass a string or json, depends on your requirements.
    • Prashant Raghav
      Prashant Raghav over 5 years
      @Nishu Tayal - content type is set to Json(application/json) in postman
  • OneCricketeer
    OneCricketeer over 5 years
    For performance reasons, move the ObjectWriter declaration outside of the method, and don't pretty print the message
  • OneCricketeer
    OneCricketeer over 5 years
    However, Spring should already know how to use Jackson to convert your objects codenotfound.com/…
  • bittu
    bittu over 5 years
    Yes. you don't need to manually serialze. JsonSerializer internally usage ObjectMapper so it will work with User object as well. Refer my answer to correct your config and more details