How to read records in JSON format from Kafka using Structured Streaming?
From the Spark perspective value
is just a byte sequence. It has no knowledge about the serialization format or content. To be able to extract the filed you have to parse it first.
If data is serialized as a JSON string you have two options. You can cast
value
to StringType
and use from_json
and provide a schema:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.from_json
val schema: StructType = StructType(Seq(
StructField("column1", ???),
StructField("column2", ???)
))
rawKafkaDF.select(from_json($"value".cast(StringType), schema))
or cast
to StringType
, extract fields by path using get_json_object
:
import org.apache.spark.sql.functions.get_json_object
val columns: Seq[String] = ???
val exprs = columns.map(c => get_json_object($"value", s"$$.$c"))
rawKafkaDF.select(exprs: _*)
and cast
later to the desired types.
Related videos on Youtube
Stefan Repcek
I am an enthusiastic software engineer with the focus on backend development and data/stream processing. I possess a good knowledge of Java 8+, Scala, Java EE, Spring and data/stream processing technologies (Spark, Kafka). In last couple of years, I was involved in different projects related to finance, security, Big Data and cloud computing building scalable, distributed applications. Currently, I am passionate about functional programming, containers, and microservice architecture. I like working with open-source technologies. I believe that a good software engineer should master the core concept of computer science with the best coding practices. He is constantly updating his knowledge of current technological stack and technological trends to not fall behind with innovation in the industry. With this kept in mind, a good engineer is able to pick the right tools depending on the project requirements, not the language/technologies preferred.
Updated on October 27, 2020Comments
-
Stefan Repcek over 3 years
I am trying to use structured streaming approach using Spark-Streaming based on DataFrame/Dataset API to load a stream of data from Kafka.
I use:
- Spark 2.10
- Kafka 0.10
- spark-sql-kafka-0-10
Spark Kafka DataSource has defined underlying schema:
|key|value|topic|partition|offset|timestamp|timestampType|
My data come in json format and they are stored in the value column. I am looking for a way how to extract underlying schema from value column and update received dataframe to columns stored in value? I tried the approach below but it does not work:
val columns = Array("column1", "column2") // column names val rawKafkaDF = sparkSession.sqlContext.readStream .format("kafka") .option("kafka.bootstrap.servers","localhost:9092") .option("subscribe",topic) .load() val columnsToSelect = columns.map( x => new Column("value." + x)) val kafkaDF = rawKafkaDF.select(columnsToSelect:_*) // some analytics using stream dataframe kafkaDF val query = kafkaDF.writeStream.format("console").start() query.awaitTermination()
Here I am getting Exception
org.apache.spark.sql.AnalysisException: Can't extract value from value#337;
because in time of creation of the stream, values inside are not known...Do you have any suggestions?
-
Venki about 5 yearsFWIW, i find both alternatives clunky: + the schema alternative requires schema specification using Spark native code. So if we are shipping a complicated schema, code has to parse a schema file (xsd etc) and build this object. + get_json_object alternative forces individually popping out paths/fields. I'm not sure whats the performance penalty of this. I'd have prefered if Apache Spark shipped with a easier way to cleanly accept a schema file and generate Spark/Catalyst schema object.
-
Venki about 5 yearsanother way to get schema for messages (its yet another clunky way): stackoverflow.com/questions/48361177/…
-
Monu about 2 yearsi am having hard time to do it in python. can anyone suggest same.