Send a json payload to Apache Kafka topic in Spring
Able to reproduce your error at my local setup.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.example.demo.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: com.example.demo.User cannot be cast to java.lang.String
If we look at log then it says value.serializer
value still referring to default StringSerializer
rather than expected is JsonSerializer
which in turn says your producer configuration are not getting into effect. In short your KafkaCOnfiguration
class is not being referred.
Your KafkaConfiguration
class is in some config
package and User
class in some com.xyz package. So solution would be to make sure that it gets picks up your configuration. Most probably that package may not getting scanned for configuration/beans definition. If you move KafkaConfiguration to root package of your application then your original code should work fine.
If you say that your KafkaTemplate object getting injected then it's actually not. The one which is getting injected is defined by Spring kafka Autoconfiguration.
Prashant Raghav
Updated on June 04, 2022Comments
-
Prashant Raghav almost 2 years
I need to send (Post) Json payload to Apache Kafka topic but I am receiving the following error :- "message": "Can't convert value of class com.xyz.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer"
Also Spring shows class cast exception :- java.lang.ClassCastException: com.xyz.User cannot be cast to java.lang.String
Following is my modal,kafka config and controller
public class User { private String firstname; private String email; public User() {} public User(String firstname, String email) { super(); this.firstname = firstname; this.email = email; } public String getFirstname() { return firstname; } public void setFirstname(String firstname) { this.firstname = firstname; } public String getEmail() { return email; } public void setEmail(String email) { this.email = email; } @Override public String toString() { return "UserModel [firstname=" + firstname + ", email=" + email + "]"; } }
Kafka Configuration
package config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer; import com.xyz.User; @Configuration public class KafkaConfiguration { @Bean public ProducerFactory<String, User> producerFactory(){ Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(config); } @Bean public KafkaTemplate<String, User> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
Controller
@RestController @RequestMapping("/kafka") public class UserController { @Autowired private KafkaTemplate<String, User> kafkaTemplate; private static String TOPIC = "kafka-producer"; @PostMapping("/publish") public void getUserId(@RequestBody User user) { kafkaTemplate.send(TOPIC, user);
Json Payload send from postman
{ "firstname" : "xyz", "email" : "[email protected]" }
-
aurelius over 5 yearsthe producer is configured in xml? can you post that one as well?
-
Prashant Raghav over 5 years@aurelius , I am using spring-boot , i don't have any xml configurations for producer.
-
aurelius over 5 yearsthen you need the KafkaTemplate, which is used to send the messages
-
Prashant Raghav over 5 yearsyes i am trying to send via kafkatemplate kafkaTemplate.send(TOPIC, user);
-
aurelius over 5 yearscan you post that code as well, I think you did not change the type of the message that you send, from string to User
-
Deadpool over 5 yearscan you post the import statements of config class
-
Prashant Raghav over 5 yearsadded import statements as suggested by Deadpool
-
Nishu Tayal over 5 yearswondering if you have a content-type set to json type in your request headers?
-
Paizo over 5 yearsIf you are want to send only USER than you can create your own converter and specify it in the kafka configuration. if you use String serializer/deserializer or json well, you have to pass a string or json, depends on your requirements.
-
Prashant Raghav over 5 years@Nishu Tayal - content type is set to Json(application/json) in postman
-
-
OneCricketeer over 5 yearsFor performance reasons, move the ObjectWriter declaration outside of the method, and don't pretty print the message
-
OneCricketeer over 5 yearsHowever, Spring should already know how to use Jackson to convert your objects codenotfound.com/…
-
bittu over 5 yearsYes. you don't need to manually serialze. JsonSerializer internally usage ObjectMapper so it will work with User object as well. Refer my answer to correct your config and more details