How to produce a json object message into kafka topic using java(spring)?

33,081

Solution 1

That solved my problem:

 Producer<String, String> producer = null;

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    producer = new KafkaProducer<>(props);

    try {
        producer = new KafkaProducer<String, String>(props);
    } catch (Exception e) {
        e.printStackTrace();
    }
    blobStorageChecker = new BlobStorageChecker();
    String folder = blobStorageChecker.getCurrentDateUTC();
    String msg = "{\"targetFileInfo\":{\"path\":\"test/"+folder+"row01-small.txt\"},\"sourceFileInfo\":{\"lastModifiedTime\":1525437960000,\"file\":\"/row01-small-01.txt\",\"filename\":\"/data/row01/row01-small.txt\",\"size\":19728,\"remoteUri\":\"ftp://waws-prod-am2-191.ftp.net/data/orsted-real/inbound/row01\",\"contentEncoding\":\"\",\"contentType\":\"\"}}";
    ProducerRecord<String, String> record = new ProducerRecord<String, String>("event-orsted-v1", null, msg);
    if (producer != null) {
        try {
            Future<RecordMetadata> future = producer.send(record);
            RecordMetadata metadata = future.get();
        } catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
        }
    }
    producer.close();

Solution 2

As per the comment you need to send JsonNode as message on kafka. Write a custom Serializer / Deserializer for the same.

import java.io.IOException;
import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

public class JsonNodeSerDes implements Serializer<JsonNode>, Deserializer<JsonNode> {

    private ObjectMapper mapper = new ObjectMapper();

    @Override
    public byte[] serialize(String topic, JsonNode data) {

        try {
            return mapper.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            return new byte[0];
        }
    }

    @Override
    public JsonNode deserialize(String topic, byte[] data) {

        try {
            return mapper.readValue(data, JsonNode.class);
        } catch (IOException e) {
            return null;
        }
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public void close() {
    }
}

I wrote serializer / deserializer in the same class. You can separate them in two class (one implementing Serializer, another implementing Deserializer).

While creating KafkaProducer you need to provide "value.serializer" config and "value.deserializer" config for KafkaConsumer.

External Dependencies used:

<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
  <version>2.8.8</version>
</dependency>

Share:
33,081

Related videos on Youtube

guguli
Author by

guguli

Updated on February 17, 2020

Comments

  • guguli
    guguli about 4 years

    I want to produce a message into kafka topic. That message should have this pattern:

       {"targetFileInfo":{"path":"2018-05-07-10/row01-small-01.txt.ready"}}
    

    I know that is a json pattern, so how can i convert that json in String?

    I use a maven project, so which dependencies are needed to use

     String stringData = JSON.stringify({"targetFileInfo":{"path":"2018-05-07-10/row01-small-01.txt.ready"}});
    

    So I think it is better don't convert Json to string and send indeed that massage into kafka topic.

    My Code is like that, it can send a String but i don't know how i can modify my code to send the massage above. maybe you can help me.

     Producer<String, String> producer = null;
    
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
        producer = new KafkaProducer<>(props);
        String msg = "welcome";
        producer.send(new ProducerRecord<String, String>("event", msg));
    
        producer.close();
    
    • guguli
      guguli almost 6 years
      stringify is not displayed to select that.
    • Pratapi Hemant Patel
      Pratapi Hemant Patel almost 6 years
      It looks like your json is already string, no need to stringify again.
    • guguli
      guguli almost 6 years
      so can you give me an example how I can produce that message in kafka? producer.send(new ProducerRecord<String, JsonNode>("event-orsted-v1", jsonNode));
    • Pratapi Hemant Patel
      Pratapi Hemant Patel almost 6 years
      Is it com.fasterxml.jackson.databind.JsonNode, or from another package?