Kafka producer sending invalid characters

12,470

Those could be tab characters (because of indented JSON) that your console doesn't interpret well.

If you disable the indentation of the output generated by the object mapper, those characters might go away.

try {
    mapper.disable(SerializationFeature.INDENT_OUTPUT);     <---- add this line
    json = mapper.writeValueAsString(p);
} catch (JsonProcessingException e1) {
    // TODO Auto-generated catch block
    e1.printStackTrace();
}       
Share:
12,470
Utkarsh Mishra
Author by

Utkarsh Mishra

Full-stack Technology lead with 7+ years of experience in shipping market-ready products &amp; solutions with proven knowledge of the cross-functional team, project planning, reporting &amp; analytics.

Updated on June 09, 2022

Comments

  • Utkarsh Mishra
    Utkarsh Mishra almost 2 years

    Using following code, I send Elasticsearch documents for indexing. I tried converting basic Objects to JSON and sent via producer. However, every message (as checked from the console) appends jibberish characters like - ��t�{"productId":2455

    public boolean sendMessage()
    {
        PageRequest page = new PageRequest(0, 1); 
        Product p = product.findByName("Cream", page).getContent().get(0);
        String json = "";
        ObjectMapper mapper = new ObjectMapper();
        try {
            json = mapper.writeValueAsString(p);
        } catch (JsonProcessingException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }       
        logger.info("JSON = " + json);
    
        boolean status =  inputToKafka.send(org.springframework.integration.support.MessageBuilder.withPayload(json).build());
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return status;
    }
    

    Outbound configuration

     <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:int="http://www.springframework.org/schema/integration"
           xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
           xmlns:task="http://www.springframework.org/schema/task"
           xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
    
        <int:channel id="inputToKafka">
            <int:queue/>
        </int:channel>
    
        <int-kafka:outbound-channel-adapter
                id="kafkaOutboundChannelAdapter"
                kafka-producer-context-ref="kafkaProducerContext"
                channel="inputToKafka">
            <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
        </int-kafka:outbound-channel-adapter>
    
        <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/>
    
        <int-kafka:producer-context id="kafkaProducerContext">
            <int-kafka:producer-configurations>
                <int-kafka:producer-configuration broker-list="localhost:9092"
                                                  topic="test_topic"
                                                  compression-codec="default"/>
            </int-kafka:producer-configurations>
        </int-kafka:producer-context>
    
        </beans>
    

    Any clue ?

    Plugin used: Spring Extension Kafka