How to read records in JSON format from Kafka using Structured Streaming?

10,474

From the Spark perspective value is just a byte sequence. It has no knowledge about the serialization format or content. To be able to extract the filed you have to parse it first.

If data is serialized as a JSON string you have two options. You can cast value to StringType and use from_json and provide a schema:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.from_json

val schema: StructType = StructType(Seq(
  StructField("column1", ???),
  StructField("column2", ???)
))

rawKafkaDF.select(from_json($"value".cast(StringType), schema))

or cast to StringType, extract fields by path using get_json_object:

import org.apache.spark.sql.functions.get_json_object

val columns: Seq[String] = ???

val exprs = columns.map(c => get_json_object($"value", s"$$.$c"))

rawKafkaDF.select(exprs: _*)

and cast later to the desired types.

Share:
10,474

Related videos on Youtube

Stefan Repcek
Author by

Stefan Repcek

I am an enthusiastic software engineer with the focus on backend development and data/stream processing. I possess a good knowledge of Java 8+, Scala, Java EE, Spring and data/stream processing technologies (Spark, Kafka). In last couple of years, I was involved in different projects related to finance, security, Big Data and cloud computing building scalable, distributed applications. Currently, I am passionate about functional programming, containers, and microservice architecture. I like working with open-source technologies. I believe that a good software engineer should master the core concept of computer science with the best coding practices. He is constantly updating his knowledge of current technological stack and technological trends to not fall behind with innovation in the industry. With this kept in mind, a good engineer is able to pick the right tools depending on the project requirements, not the language/technologies preferred.

Updated on October 27, 2020

Comments

  • Stefan Repcek
    Stefan Repcek over 3 years

    I am trying to use structured streaming approach using Spark-Streaming based on DataFrame/Dataset API to load a stream of data from Kafka.

    I use:

    • Spark 2.10
    • Kafka 0.10
    • spark-sql-kafka-0-10

    Spark Kafka DataSource has defined underlying schema:

    |key|value|topic|partition|offset|timestamp|timestampType|
    

    My data come in json format and they are stored in the value column. I am looking for a way how to extract underlying schema from value column and update received dataframe to columns stored in value? I tried the approach below but it does not work:

     val columns = Array("column1", "column2") // column names
     val rawKafkaDF = sparkSession.sqlContext.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers","localhost:9092")
      .option("subscribe",topic)
      .load()
      val columnsToSelect = columns.map( x => new Column("value." + x))
      val kafkaDF = rawKafkaDF.select(columnsToSelect:_*)
    
      // some analytics using stream dataframe kafkaDF
    
      val query = kafkaDF.writeStream.format("console").start()
      query.awaitTermination()
    

    Here I am getting Exception org.apache.spark.sql.AnalysisException: Can't extract value from value#337; because in time of creation of the stream, values inside are not known...

    Do you have any suggestions?

  • Venki
    Venki about 5 years
    FWIW, i find both alternatives clunky: + the schema alternative requires schema specification using Spark native code. So if we are shipping a complicated schema, code has to parse a schema file (xsd etc) and build this object. + get_json_object alternative forces individually popping out paths/fields. I'm not sure whats the performance penalty of this. I'd have prefered if Apache Spark shipped with a easier way to cleanly accept a schema file and generate Spark/Catalyst schema object.
  • Venki
    Venki about 5 years
    another way to get schema for messages (its yet another clunky way): stackoverflow.com/questions/48361177/…
  • Monu
    Monu about 2 years
    i am having hard time to do it in python. can anyone suggest same.