Integrating Spark Structured Streaming with the Confluent Schema Registry
Solution 1
Disclaimer
This code was only tested on a local master, and has been reported runs into serializer issues in a clustered environment. There's an alternative solution (step 7-9, with Scala code in step 10) that extracts out the schema ids to columns, looks up each unique ID, and then uses schema broadcast variables, which will work better, at scale.
Also, there is an external library AbsaOSS/ABRiS
that also addresses using the Registry with Spark
Since the other answer that was mostly useful was removed, I wanted to re-add it with some refactoring and comments.
Here are the dependencies needed. Code tested with Confluent 5.x and Spark 2.4
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
<exclusions>
<!-- Conflicts with Spark's version -->
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
And here is the Scala implementation (only tested locally on master=local[*]
)
First section, define the imports, some fields, and a few helper methods to get schemas
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.cli.CommandLine
import org.apache.spark.sql._
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.streaming.OutputMode
object App {
private var schemaRegistryClient: SchemaRegistryClient = _
private var kafkaAvroDeserializer: AvroDeserializer = _
def lookupTopicSchema(topic: String, isKey: Boolean = false) = {
schemaRegistryClient.getLatestSchemaMetadata(topic + (if (isKey) "-key" else "-value")).getSchema
}
def avroSchemaToSparkSchema(avroSchema: String) = {
SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
}
// ... continues below
Then define a simple main method that parses the CMD args to get Kafka details
def main(args: Array[String]): Unit = {
val cmd: CommandLine = parseArg(args)
val master = cmd.getOptionValue("master", "local[*]")
val spark = SparkSession.builder()
.appName(App.getClass.getName)
.master(master)
.getOrCreate()
val bootstrapServers = cmd.getOptionValue("bootstrap-server")
val topic = cmd.getOptionValue("topic")
val schemaRegistryUrl = cmd.getOptionValue("schema-registry")
consumeAvro(spark, bootstrapServers, topic, schemaRegistryUrl)
spark.stop()
}
// ... still continues
Then, the important method that consumes the Kafka topic and deserializes it
private def consumeAvro(spark: SparkSession, bootstrapServers: String, topic: String, schemaRegistryUrl: String): Unit = {
import spark.implicits._
// Setup the Avro deserialization UDF
schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
spark.udf.register("deserialize", (bytes: Array[Byte]) =>
kafkaAvroDeserializer.deserialize(bytes)
)
// Load the raw Kafka topic (byte stream)
val rawDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
// Deserialize byte stream into strings (Avro fields become JSON)
import org.apache.spark.sql.functions._
val jsonDf = rawDf.select(
// 'key.cast(DataTypes.StringType), // string keys are simplest to use
callUDF("deserialize", 'key).as("key"), // but sometimes they are avro
callUDF("deserialize", 'value).as("value")
// excluding topic, partition, offset, timestamp, etc
)
// Get the Avro schema for the topic from the Schema Registry and convert it into a Spark schema type
val dfValueSchema = {
val rawSchema = lookupTopicSchema(topic)
avroSchemaToSparkSchema(rawSchema)
}
// Apply structured schema to JSON stream
val parsedDf = jsonDf.select(
'key, // keys are usually plain strings
// values are JSONified Avro records
from_json('value, dfValueSchema.dataType).alias("value")
).select(
'key,
$"value.*" // flatten out the value
)
// parsedDf.printSchema()
// Sample schema output
// root
// |-- key: string (nullable = true)
// |-- header: struct (nullable = true)
// | |-- time: long (nullable = true)
// | ...
// TODO: Do something interesting with this stream
parsedDf.writeStream
.format("console")
.outputMode(OutputMode.Append())
.option("truncate", false)
.start()
.awaitTermination()
}
// still continues
The command line parser allows for passing in bootstrap servers, schema registry, topic name, and Spark master.
private def parseArg(args: Array[String]): CommandLine = {
import org.apache.commons.cli._
val options = new Options
val masterOption = new Option("m", "master", true, "Spark master")
masterOption.setRequired(false)
options.addOption(masterOption)
val bootstrapOption = new Option("b", "bootstrap-server", true, "Bootstrap servers")
bootstrapOption.setRequired(true)
options.addOption(bootstrapOption)
val topicOption = new Option("t", "topic", true, "Kafka topic")
topicOption.setRequired(true)
options.addOption(topicOption)
val schemaRegOption = new Option("s", "schema-registry", true, "Schema Registry URL")
schemaRegOption.setRequired(true)
options.addOption(schemaRegOption)
val parser = new BasicParser
parser.parse(options, args)
}
// still continues
In order for the UDF above to work, then there needed to be a deserializer to take the DataFrame of bytes to one containing deserialized Avro
// Simple wrapper around Confluent deserializer
class AvroDeserializer extends AbstractKafkaAvroDeserializer {
def this(client: SchemaRegistryClient) {
this()
// TODO: configure the deserializer for authentication
this.schemaRegistry = client
}
override def deserialize(bytes: Array[Byte]): String = {
val value = super.deserialize(bytes)
value match {
case str: String =>
str
case _ =>
val genericRecord = value.asInstanceOf[GenericRecord]
genericRecord.toString
}
}
}
} // end 'object App'
Put each of these blocks together, and it works in IntelliJ after adding -b localhost:9092 -s http://localhost:8081 -t myTopic
to Run Configurations > Program Arguments
Solution 2
It took me a couple months of reading source code and testing things out. In a nutshell, Spark can only handle String and Binary serialization. You must manually deserialize the data. In spark, create the confluent rest service object to get the schema. Convert the schema string in the response object into an Avro schema using the Avro parser. Next, read the Kafka topic as normal. Then map over the binary typed "value" column with the Confluent KafkaAvroDeSerializer. I strongly suggest getting into the source code for these classes because there is a lot going on here, so for brevity I'll leave out many details.
//Used Confluent version 3.2.2 to write this.
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
case class DeserializedFromKafkaRecord(key: String, value: String)
val schemaRegistryURL = "http://127.0.0.1:8081"
val topicName = "Schema-Registry-Example-topic1"
val subjectValueName = topicName + "-value"
//create RestService object
val restService = new RestService(schemaRegistryURL)
//.getLatestVersion returns io.confluent.kafka.schemaregistry.client.rest.entities.Schema object.
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)
//Use Avro parsing classes to get Avro Schema
val parser = new Schema.Parser
val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)
//key schema is typically just string but you can do the same process for the key as the value
val keySchemaString = "\"string\""
val keySchema = parser.parse(keySchemaString)
//Create a map with the Schema registry url.
//This is the only Required configuration for Confluent's KafkaAvroDeserializer.
val props = Map("schema.registry.url" -> schemaRegistryURL)
//Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
var keyDeserializer: KafkaAvroDeserializer = null
var valueDeserializer: KafkaAvroDeserializer = null
//Create structured streaming DF to read from the topic.
val rawTopicMessageDF = sql.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", topicName)
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 20) //remove for prod
.load()
//instantiate the SerDe classes if not already, then deserialize!
val deserializedTopicMessageDS = rawTopicMessageDF.map{
row =>
if (keyDeserializer == null) {
keyDeserializer = new KafkaAvroDeserializer
keyDeserializer.configure(props.asJava, true) //isKey = true
}
if (valueDeserializer == null) {
valueDeserializer = new KafkaAvroDeserializer
valueDeserializer.configure(props.asJava, false) //isKey = false
}
//Pass the Avro schema.
val deserializedKeyString = keyDeserializer.deserialize(topicName, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
val deserializedValueString = valueDeserializer.deserialize(topicName, row.value, topicValueAvroSchema).toString
DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueString)
}
val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
.outputMode("append")
.format("console")
.option("truncate", false)
.start()
Solution 3
This is an example of my code integrating spark structured streaming with kafka and schema registry (code in scala)
import org.apache.spark.sql.SparkSession
import io.confluent.kafka.schemaregistry.client.rest.RestService // <artifactId>kafka-schema-registry</artifactId>
import org.apache.spark.sql.avro.from_avro // <artifactId>spark-avro_${scala.compat.version}</artifactId>
import org.apache.spark.sql.functions.col
object KafkaConsumerAvro {
def main(args: Array[String]): Unit = {
val KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
val SCHEMA_REGISTRY_URL = "http://localhost:8081"
val TOPIC = "transactions"
val spark: SparkSession = SparkSession.builder().appName("KafkaConsumerAvro").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
.option("subscribe", TOPIC)
.option("startingOffsets", "earliest") // from starting
.load()
// Prints Kafka schema with columns (topic, offset, partition e.t.c)
df.printSchema()
// Create REST service to access schema registry and retrieve topic schema (latest)
val restService = new RestService(SCHEMA_REGISTRY_URL)
val valueRestResponseSchema = restService.getLatestVersion(TOPIC + "-value")
val jsonSchema = valueRestResponseSchema.getSchema
val transactionDF = df.select(
col("key").cast("string"), // cast to string from binary value
from_avro(col("value"), jsonSchema).as("transaction"), // convert from avro value
col("topic"),
col("offset"),
col("timestamp"),
col("timestampType"))
transactionDF.printSchema()
// Stream data to console for testing
transactionDF.writeStream
.format("console")
.outputMode("append")
.start()
.awaitTermination()
}
}
When reading from kafka topic, we have this kind of schema:
key: binary | value: binary | topic: string | partition: integer | offset: long | timestamp: timestamp | timestampType: integer |
As we can see, key and value are binary so we need to cast key as string and in this case, value is avro formatted so we can achieve this by calling from_avro
function.
In adition to Spark and Kafka dependencies, we need this dependencies:
<!-- READ AND WRITE AVRO DATA -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- INTEGRATION WITH SCHEMA REGISTRY -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>${confluent.version}</version>
</dependency>
Solution 4
Another very simple alternative for pyspark
(without full support for schema registry like schema registration, compatibility check, etc.) could be:
import requests
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.avro.functions import *
# variables
topic = "my-topic"
schemaregistry = "http://localhost:8081"
kafka_brokers = "kafka1:9092,kafka2:9092"
# retrieve the latest schema
response = requests.get('{}/subjects/{}-value/versions/latest/schema'.format(schemaregistry, topic))
# error check
response.raise_for_status()
# extract the schema from the response
schema = response.text
# run the query
query = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", kafka_brokers) \
.option("subscribe", topic) \
.load() \
# The magic goes here:
# Skip the first 5 bytes (reserved by schema registry encoding protocol)
.selectExpr("substring(value, 6) as avro_value") \
.select(from_avro(col("avro_value"), schema).alias("data")) \
.select(col("data.my_field")) \
.writeStream \
.format("console") \
.outputMode("complete") \
.start()
Solution 5
This library will do the job for you. It connects to Confluent Schema Registry through Spark Structured Stream.
For Confluent, it copes with the schema id that is sent along with the payload.
In the README you will find a code snippet of how to do it.
DISCLOSURE: I work for ABSA and I developed this library.
Related videos on Youtube
Souhaib Guitouni
Updated on July 24, 2021Comments
-
Souhaib Guitouni almost 3 years
I'm using a Kafka Source in Spark Structured Streaming to receive Confluent encoded Avro records. I intend to use Confluent Schema Registry, but the integration with spark structured streaming seems to be impossible.
I have seen this question, but unable to get it working with the Confluent Schema Registry. Reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)
-
Souhaib Guitouni about 6 yearsThanks @VinothChinnasamy but your link is about classic spark streaming, I'm talking about spark STRUCTURED streaming
-
G.Saleh about 6 yearsyou need to respect kafka spark integration : spark.apache.org/docs/latest/…
-
Souhaib Guitouni about 6 years@G.Saleh thank you but you misunderstand the question.
-
Souhaib Guitouni about 6 yearsPlease upvote the confluence issue about it : github.com/confluentinc/schema-registry/issues/755
-
OneCricketeer about 6 yearsPossible duplicate of reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)
-
Souhaib Guitouni about 6 years@cricket_007 no it's not the same thing
-
OneCricketeer about 6 yearsWhy not? See
schemaRegistryUrl
in this answer? stackoverflow.com/a/41146808/2308683 -
Souhaib Guitouni about 6 yearsThat question is not about the schema registry. The answer for it was not clear, but with your edits it's much clearer, thank you!
-
arunmahadevan about 5 yearsCheck out this project - github.com/hortonworks-spark/spark-schema-registry This allows you to integrate Hortonwork's Schema registry (github.com/hortonworks/registry) with Spark. It may also be possible to plug this into Confluent Schema registry (since the Hortonworks Schema registry is compatible with the Confluent one), but you will need to explore it further.
-
arunmahadevan about 5 yearsCheck out this project - github.com/hortonworks-spark/spark-schema-registry This allows you to integrate Hortonwork's Schema registry (github.com/hortonworks/registry) with Spark. It may also be possible to plug this into Confluent Schema registry (since the Hortonworks Schema registry is compatible with the Confluent one), but you will need to explore it further.
-
-
Jeremy about 6 yearsCan you elaborate on the comment
topic name is actually unused in the source code, just required by the signature. Weird right?
-
tstites about 6 yearsIt seems the signature for the deserialize method calls for a string, but it is unused in the function body. KafkaAvroDeserializer.java
-
Mikhail almost 6 yearsdescription in this lib seems not correct for example in decripton there is 2.0.0 version but in maven i saw only 1.0.0
-
Mikhail almost 6 yearsalso i can not build the project. i have an error: [ERROR] E:\projects\dvsts\ABRiS\src\test\scala\za\co\absa\abris\avro\read\confluent\ScalaConfluentKafkaAvroDeserializerSpec.scala:113: error: class MockedSchemaRegistryClient needs to be abstract, since: [ERROR] it has 8 unimplemented members.
-
Felipe Martins Melo almost 6 years@Mikhail, the new version was updated yesterday, and probably when you checked Maven Central it had not yet been synchronized. You can find it here: mvnrepository.com/artifact/za.co.absa/abris/2.0.0
-
OneCricketeer over 5 yearsWould be nice to see an example usage here on this answer
-
Vibhor Nigam about 5 yearsHi, I am trying to implement the same code. I am getting an exception at keyDeserializer.deserialize(topicName, row.key, keySchema).toString , saying keySchema is org.apache.avro.Schema where as required is Array[Byte]. Checked the source code it looks like it expects Array[Byte] github.com/confluentinc/schema-registry/blob/master/…. Something i am missing here ?
-
Pyd about 5 years@cricket_007, does this library work with spark Java api, as I cannot able to get
fromavro
method after all the imports. could you please ? -
OneCricketeer about 5 yearsOnly Databricks supports the registry, not Apache Spark itself
-
OneCricketeer about 5 years@Vignesh I have not used it. But yes, any Scala library can be imported into Java code because it's all still running in the same JVM.
-
Karthikeyan almost 5 years@tstites, im not able to find io.confluent.kafka.schemaregistry.client.rest.RestService this package in any confluent repositories, can you give location of this jar or mvn repository for this package?
-
OneCricketeer almost 5 years@Karthikeyan github.com/confluentinc/schema-registry/blob/master/client/src/… is part of
io.confluent:kafka-schema-registry-client
And the repo is here docs.confluent.io/current/clients/… -
xav over 4 yearsDoes Databricks support Schema Registry of Confluent? Or another type of schema registry. In case you can use Databricks, someone knows how to pass the schema registry credentials.I say this because the examples I find do not comment on it.
-
Leibnitz over 4 yearsCould you please explain how we can pass schema registry credentials in you program in case needed?
-
davidretana over 4 yearsI don't need to authenticate against schema registry, but i've found this information: (docs.confluent.io/current/schema-registry/security/index.html), and in this link you can configure Schema Registry authorization for communicating with the RBAC Kafka cluster. (docs.confluent.io/current/schema-registry/security/…)
-
Antón R. Yuste about 4 yearsTo pass schema registry credentials, see this answer: stackoverflow.com/a/58930199/6002794
-
Rafael Barros almost 4 yearsfor some reason the broadcast strings aren't working inside the
map
. Why? -
Learnis almost 4 yearsIt's not working in standlone cluster mode..throws Failed to execute user defined function(anonfun$consumeAvro$1: (binary) => string)
-
OneCricketeer almost 4 years@xav Yes. Databricks partners with Confluent to support that Avro + Schema Registry functionality
-
OneCricketeer almost 4 yearsI'm not sure you need to broadcast for each batch, also topic name isn't used by the deserialize, I believe
-
Learnis almost 4 yearsyeah @OneCricketeer. It's not working in yarn as well. Getting a null exception while deserializing line kafkaAvroDeserializer.deserialize(bytes). I try to handle with Try{}.getOrelse(), but if I do that I'm not getting data in data frame. Is it solvable ? any workaround for this ?
-
Learnis almost 4 yearsWill this work in standalone cluster or yarn mode ?
-
Learnis almost 4 yearsOr any working solutions in this stackoverflow post ? for cluster mode
-
OneCricketeer almost 4 years@Learnis If you get an NPE in deserialize, then the bytes are probably null. I believe the deserializer is correctly initialized in the closure
-
scalacode almost 4 years@VibhorNigam, getting the same row.value error as you , did you fix it
-
Vibhor Nigam almost 4 years@scalacode unfortunately no. i got another source with same data so moved on from this issue.
-
soMuchToLearnAndShare over 3 yearsHi @OneCricketeer, what is the
spark.version
that you used? andconfluent.version
? -
OneCricketeer over 3 years@Minnie Probably 2.4.x and 5.x
-
soMuchToLearnAndShare over 3 yearsHi timothyzhang, you did not need the UDF like @OneCricketeer did?
-
soMuchToLearnAndShare over 3 yearsHi @OneCricketeer, why the key and value has a single quote in front of it? also, I had to add extra dependency ``` "org.apache.spark" %% "spark-sql"``` (sbt syntax), for the compiler to be happy for SparkSession etc. and then it has below error
type mismatch; [error] found : Symbol [error] required: org.apache.spark.sql.Column [error] callUDF("deserialize", 'key).as("key"), // but sometimes they are avro
-
OneCricketeer over 3 years@Minnie The single quote creates a Symbol object. The way to get a column would be
Column("key")
or$"key"
, but that was more typing -
soMuchToLearnAndShare over 3 yearsHi @OneCricketeer, when run in inteliJ, it has exception
Exception in thread "main" java.net.ConnectException: Connection refused (Connection refused) ... ... at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:272)
. basically i could not connect to the SchemaRegistry. Note Kafka Confluent 5.5.1 is running. I tried this commandnc -vz localhost 8081
to verify, and yes, it has errornc: connect to localhost port 8081 (tcp) failed: Connection refused
. Any suggestions on this? Thanks a lot. -
soMuchToLearnAndShare over 3 yearsHi @timothyzhang, did you expereience this issue in your version test? stackoverflow.com/questions/63846392/…
-
OneCricketeer over 3 years@Minnie Sound like you need to start a Schema Registry?
-
Venkat about 3 years#The magic goes here: this worked for me. But why do we need to skip first 5 bytes.
-
dudssource about 3 yearsHi @Venkat, this is necessary because Confluent reserves this first bytes for it's internal wire format
-
Rafa almost 3 yearsHow can we use this function in spark structured streaming , i am having spark 2.3.2 no from_avro and to_avro function available
-
OneCricketeer over 2 years@Rafa Then you need to add databricks spark-avro library
-
Lars Skaug about 2 yearsThank you!!! I spent hours scratching my head and this simple solution really works.
-
Ivan almost 2 yearsThis works!!! I spent several hours trying to solve this issue. Nice workaround.