Parsing JSON data using Apache Kafka Streaming

11,543

I would suggest you the following to have more control on the JSON data.

  1. write a Serializer and De-Serializer.
  2. Create a POJO basing on the JSON String. POJO is the best way to have more control on the data.
  3. Map the data to POJO to access the required data.

POJO:

@JsonRootName("person")
public class Person implements Serializable {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    private String name;
    private String personalID;
    private String country;
    private String occupation;

    public Person() {

    }

    @JsonCreator
    public Person(@JsonProperty("name") String name,@JsonProperty("personalID") String personalID,
            @JsonProperty("country") String country,@JsonProperty("occupation") String occupation){
        this.name= name;
        this.personalID = personalID;
        this.country = country;
        this.occupation = occupation;
    }

    //getters and setters stripped
}

Serializer:

public class JsonSerializer<T> implements Serializer<T> {

    private ObjectMapper om = new ObjectMapper();

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        // TODO Auto-generated method stub

    }

    @Override
    public byte[] serialize(String topic, T data) {
        byte[] retval = null;
        try {
            System.out.println(data.getClass());
            retval = om.writeValueAsString(data).getBytes();
        } catch (JsonProcessingException e) {
            throw new SerializationException();
        }
        return retval;
    }

}

Deserializer:

public class JsonDeserializer<T> implements Deserializer<T> {

    private ObjectMapper om = new ObjectMapper();
    private Class<T> type;

    /*
     * Default constructor needed by kafka
     */
    public JsonDeserializer() {

    }

    public JsonDeserializer(Class<T> type) {
        this.type = type;
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

    @SuppressWarnings("unchecked")
    @Override
    public void configure(Map<String, ?> map, boolean arg1) {
        if (type == null) {
            type = (Class<T>) map.get("type");
        }

    }

    @Override
    public T deserialize(String undefined, byte[] bytes) {
        T data = null;
        if (bytes == null || bytes.length == 0) {
            return null;
        }

        try {
            System.out.println(getType());
            data = om.readValue(bytes, type);
        } catch (Exception e) {
            throw new SerializationException(e);
        }

        return data;
    }

    protected Class<T> getType() {
        return type;
    }

}

Consumer:

public class ConsumerUtilities {

    public static Properties getProperties() {

        Properties configs = new Properties();
        configs.put(StreamsConfig.APPLICATION_ID_CONFIG,
                "Kafka test application");
        configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        return configs;
    }

    public static KStreamBuilder getStreamingConsumer() {
        KStreamBuilder builder = new KStreamBuilder();
        return builder;
    }

    public static void getStreamData() {
        JsonSerializer<Person> personJsonSerializer = new JsonSerializer<>();
        JsonDeserializer<Person> personJsonDeserializer = new JsonDeserializer<>(
                Person.class);
        Serde<Person> personSerde = Serdes.serdeFrom(personJsonSerializer,
                personJsonDeserializer);
        KStreamBuilder builder = getStreamingConsumer();

        try {

            KStream<String, Person> kStream = builder.stream(Serdes.String(),
                    personSerde, "test");
            kStream.foreach(new ForeachAction<String, Person>() {

                @Override
                public void apply(String arg0, Person arg1) {
                    System.out.println(arg1.getCountry());                  
                }

            });
        } catch (Exception s) {
            s.printStackTrace();
        }
        KafkaStreams kafkaStreams = new KafkaStreams(builder, getProperties());
        kafkaStreams.start();
    }

}

Producer:

public class ProducerUtilities {

    public static org.apache.kafka.clients.producer.Producer<String, Person> getProducer() {
        Properties configProperties = new Properties();
        configProperties.put(ProducerConfig.CLIENT_ID_CONFIG,
                "kafka json producer");
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "localhost:9092");
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.ByteArraySerializer");
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "com.kafka.api.serdes.JsonSerializer");

        org.apache.kafka.clients.producer.Producer<String, Person> producer = new KafkaProducer<String, Person>(
                configProperties);
        return producer;
    }

    public ProducerRecord<String, Person> createRecord(Person person) {
        ProducerRecord<String, Person> record = new ProducerRecord<String, Person>(
                "test", person);
        return record;
    }

}
Share:
11,543
Mouni
Author by

Mouni

Updated on June 25, 2022

Comments

  • Mouni
    Mouni almost 2 years

    I had a scenario to read the JSON data from my Kafka topic, and by making use of Kafka 0.11 version I need to write Java code for streaming the JSON data present in the Kafka topic.My input is a Json Data containing arrays of Dictionaries.

    Now my requirement is to get the "text" field, key in dictionary contained in array from the json data and pass all those text tweets to another topic through Kafka Streaming.

    I wrote code till here. Please help me to parse the data.

    Java code for streaming

    final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
    final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
    final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
    
    KStreamBuilder builder = new KStreamBuilder();
    
    KStream<String, JsonNode> personstwitter =builder.stream(Serdes.String(), jsonSerde, "Persons");//taking the json node as input
    
    
    personstwitter.to(Serdes.String(), jsonSerde,"Persons-output");