Kafka producer sending invalid characters
12,470
Those could be tab characters (because of indented JSON) that your console doesn't interpret well.
If you disable the indentation of the output generated by the object mapper, those characters might go away.
try {
mapper.disable(SerializationFeature.INDENT_OUTPUT); <---- add this line
json = mapper.writeValueAsString(p);
} catch (JsonProcessingException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
Author by
Utkarsh Mishra
Full-stack Technology lead with 7+ years of experience in shipping market-ready products & solutions with proven knowledge of the cross-functional team, project planning, reporting & analytics.
Updated on June 09, 2022Comments
-
Utkarsh Mishra almost 2 years
Using following code, I send Elasticsearch documents for indexing. I tried converting basic Objects to JSON and sent via producer. However, every message (as checked from the console) appends jibberish characters like - ��t�{"productId":2455
public boolean sendMessage() { PageRequest page = new PageRequest(0, 1); Product p = product.findByName("Cream", page).getContent().get(0); String json = ""; ObjectMapper mapper = new ObjectMapper(); try { json = mapper.writeValueAsString(p); } catch (JsonProcessingException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } logger.info("JSON = " + json); boolean status = inputToKafka.send(org.springframework.integration.support.MessageBuilder.withPayload(json).build()); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } return status; }
Outbound configuration
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <int:channel id="inputToKafka"> <int:queue/> </int:channel> <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="kafkaProducerContext" channel="inputToKafka"> <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/> </int-kafka:outbound-channel-adapter> <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/> <int-kafka:producer-context id="kafkaProducerContext"> <int-kafka:producer-configurations> <int-kafka:producer-configuration broker-list="localhost:9092" topic="test_topic" compression-codec="default"/> </int-kafka:producer-configurations> </int-kafka:producer-context> </beans>
Any clue ?
Plugin used: Spring Extension Kafka