Integrating Spark Structured Streaming with the Confluent Schema Registry

24,147

Solution 1

Disclaimer

This code was only tested on a local master, and has been reported runs into serializer issues in a clustered environment. There's an alternative solution (step 7-9, with Scala code in step 10) that extracts out the schema ids to columns, looks up each unique ID, and then uses schema broadcast variables, which will work better, at scale.

Also, there is an external library AbsaOSS/ABRiS that also addresses using the Registry with Spark


Since the other answer that was mostly useful was removed, I wanted to re-add it with some refactoring and comments.

Here are the dependencies needed. Code tested with Confluent 5.x and Spark 2.4

     <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
            <exclusions> 
                <!-- Conflicts with Spark's version -->
                <exclusion> 
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
     </dependency>
 
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-avro_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>

And here is the Scala implementation (only tested locally on master=local[*])

First section, define the imports, some fields, and a few helper methods to get schemas

import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.cli.CommandLine
import org.apache.spark.sql._
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.streaming.OutputMode

object App {

  private var schemaRegistryClient: SchemaRegistryClient = _

  private var kafkaAvroDeserializer: AvroDeserializer = _

  def lookupTopicSchema(topic: String, isKey: Boolean = false) = {
    schemaRegistryClient.getLatestSchemaMetadata(topic + (if (isKey) "-key" else "-value")).getSchema
  }

  def avroSchemaToSparkSchema(avroSchema: String) = {
    SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
  }

 // ... continues below

Then define a simple main method that parses the CMD args to get Kafka details

  def main(args: Array[String]): Unit = {
    val cmd: CommandLine = parseArg(args)

    val master = cmd.getOptionValue("master", "local[*]")
    val spark = SparkSession.builder()
      .appName(App.getClass.getName)
      .master(master)
      .getOrCreate()

    val bootstrapServers = cmd.getOptionValue("bootstrap-server")
    val topic = cmd.getOptionValue("topic")
    val schemaRegistryUrl = cmd.getOptionValue("schema-registry")

    consumeAvro(spark, bootstrapServers, topic, schemaRegistryUrl)

    spark.stop()
  }


  // ... still continues

Then, the important method that consumes the Kafka topic and deserializes it

  private def consumeAvro(spark: SparkSession, bootstrapServers: String, topic: String, schemaRegistryUrl: String): Unit = {
    import spark.implicits._

    // Setup the Avro deserialization UDF
    schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
    kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient) 
    spark.udf.register("deserialize", (bytes: Array[Byte]) =>
      kafkaAvroDeserializer.deserialize(bytes)
    )

    // Load the raw Kafka topic (byte stream)
    val rawDf = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topic)
      .option("startingOffsets", "earliest")
      .load()

    // Deserialize byte stream into strings (Avro fields become JSON)
    import org.apache.spark.sql.functions._
    val jsonDf = rawDf.select(
      // 'key.cast(DataTypes.StringType),  // string keys are simplest to use
      callUDF("deserialize", 'key).as("key"), // but sometimes they are avro
      callUDF("deserialize", 'value).as("value")
      // excluding topic, partition, offset, timestamp, etc
    )

    // Get the Avro schema for the topic from the Schema Registry and convert it into a Spark schema type
    val dfValueSchema = {
      val rawSchema = lookupTopicSchema(topic)
      avroSchemaToSparkSchema(rawSchema)
    }

    // Apply structured schema to JSON stream
    val parsedDf = jsonDf.select(
      'key, // keys are usually plain strings
      // values are JSONified Avro records
      from_json('value, dfValueSchema.dataType).alias("value")
    ).select(
      'key,
      $"value.*" // flatten out the value
    )

    // parsedDf.printSchema()

    // Sample schema output
    // root
    // |-- key: string (nullable = true)
    // |-- header: struct (nullable = true)
    // |    |-- time: long (nullable = true)
    // |    ...

    // TODO: Do something interesting with this stream
    parsedDf.writeStream
      .format("console")
      .outputMode(OutputMode.Append())
      .option("truncate", false)
      .start()
      .awaitTermination()
  }

 // still continues

The command line parser allows for passing in bootstrap servers, schema registry, topic name, and Spark master.

  private def parseArg(args: Array[String]): CommandLine = {
    import org.apache.commons.cli._

    val options = new Options

    val masterOption = new Option("m", "master", true, "Spark master")
    masterOption.setRequired(false)
    options.addOption(masterOption)

    val bootstrapOption = new Option("b", "bootstrap-server", true, "Bootstrap servers")
    bootstrapOption.setRequired(true)
    options.addOption(bootstrapOption)

    val topicOption = new Option("t", "topic", true, "Kafka topic")
    topicOption.setRequired(true)
    options.addOption(topicOption)

    val schemaRegOption = new Option("s", "schema-registry", true, "Schema Registry URL")
    schemaRegOption.setRequired(true)
    options.addOption(schemaRegOption)

    val parser = new BasicParser
    parser.parse(options, args)
  }

  // still continues

In order for the UDF above to work, then there needed to be a deserializer to take the DataFrame of bytes to one containing deserialized Avro

  // Simple wrapper around Confluent deserializer
  class AvroDeserializer extends AbstractKafkaAvroDeserializer {
    def this(client: SchemaRegistryClient) {
      this()
      // TODO: configure the deserializer for authentication 
      this.schemaRegistry = client
    }

    override def deserialize(bytes: Array[Byte]): String = {
      val value = super.deserialize(bytes)
      value match {
        case str: String =>
          str
        case _ =>
          val genericRecord = value.asInstanceOf[GenericRecord]
          genericRecord.toString
      }
    }
  }

} // end 'object App'

Put each of these blocks together, and it works in IntelliJ after adding -b localhost:9092 -s http://localhost:8081 -t myTopic to Run Configurations > Program Arguments

Solution 2

It took me a couple months of reading source code and testing things out. In a nutshell, Spark can only handle String and Binary serialization. You must manually deserialize the data. In spark, create the confluent rest service object to get the schema. Convert the schema string in the response object into an Avro schema using the Avro parser. Next, read the Kafka topic as normal. Then map over the binary typed "value" column with the Confluent KafkaAvroDeSerializer. I strongly suggest getting into the source code for these classes because there is a lot going on here, so for brevity I'll leave out many details.

//Used Confluent version 3.2.2 to write this. 
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema

case class DeserializedFromKafkaRecord(key: String, value: String)

val schemaRegistryURL = "http://127.0.0.1:8081"

val topicName = "Schema-Registry-Example-topic1"
val subjectValueName = topicName + "-value"

//create RestService object
val restService = new RestService(schemaRegistryURL)

//.getLatestVersion returns io.confluent.kafka.schemaregistry.client.rest.entities.Schema object.
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)

//Use Avro parsing classes to get Avro Schema
val parser = new Schema.Parser
val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)

//key schema is typically just string but you can do the same process for the key as the value
val keySchemaString = "\"string\""
val keySchema = parser.parse(keySchemaString)

//Create a map with the Schema registry url.
//This is the only Required configuration for Confluent's KafkaAvroDeserializer.
val props = Map("schema.registry.url" -> schemaRegistryURL)

//Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
var keyDeserializer: KafkaAvroDeserializer = null
var valueDeserializer: KafkaAvroDeserializer = null

//Create structured streaming DF to read from the topic.
val rawTopicMessageDF = sql.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "127.0.0.1:9092")
  .option("subscribe", topicName)
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", 20)  //remove for prod
  .load()

//instantiate the SerDe classes if not already, then deserialize!
val deserializedTopicMessageDS = rawTopicMessageDF.map{
  row =>
    if (keyDeserializer == null) {
      keyDeserializer = new KafkaAvroDeserializer
      keyDeserializer.configure(props.asJava, true)  //isKey = true
    }
    if (valueDeserializer == null) {
      valueDeserializer = new KafkaAvroDeserializer
      valueDeserializer.configure(props.asJava, false) //isKey = false
    }

    //Pass the Avro schema.
    val deserializedKeyString = keyDeserializer.deserialize(topicName, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
    val deserializedValueString = valueDeserializer.deserialize(topicName, row.value, topicValueAvroSchema).toString

    DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueString)
}

val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", false)
    .start()

Solution 3

This is an example of my code integrating spark structured streaming with kafka and schema registry (code in scala)

import org.apache.spark.sql.SparkSession
import io.confluent.kafka.schemaregistry.client.rest.RestService // <artifactId>kafka-schema-registry</artifactId>
import org.apache.spark.sql.avro.from_avro // <artifactId>spark-avro_${scala.compat.version}</artifactId>
import org.apache.spark.sql.functions.col

object KafkaConsumerAvro {

  def main(args: Array[String]): Unit = {

    val KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
    val SCHEMA_REGISTRY_URL = "http://localhost:8081"
    val TOPIC = "transactions"

    val spark: SparkSession = SparkSession.builder().appName("KafkaConsumerAvro").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
      .option("subscribe", TOPIC)
      .option("startingOffsets", "earliest") // from starting
      .load()

//     Prints Kafka schema with columns (topic, offset, partition e.t.c)
    df.printSchema()

//    Create REST service to access schema registry and retrieve topic schema (latest)
    val restService = new RestService(SCHEMA_REGISTRY_URL)
    val valueRestResponseSchema = restService.getLatestVersion(TOPIC + "-value")
    val jsonSchema = valueRestResponseSchema.getSchema

    val transactionDF = df.select(
      col("key").cast("string"), // cast to string from binary value
      from_avro(col("value"), jsonSchema).as("transaction"), // convert from avro value
      col("topic"),
      col("offset"),
      col("timestamp"),
      col("timestampType"))
    transactionDF.printSchema()

//    Stream data to console for testing
    transactionDF.writeStream
      .format("console")
      .outputMode("append")
      .start()
      .awaitTermination()
  }

}

When reading from kafka topic, we have this kind of schema:

key: binary | value: binary | topic: string | partition: integer | offset: long | timestamp: timestamp | timestampType: integer |

As we can see, key and value are binary so we need to cast key as string and in this case, value is avro formatted so we can achieve this by calling from_avro function.

In adition to Spark and Kafka dependencies, we need this dependencies:

<!-- READ AND WRITE AVRO DATA -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-avro_${scala.compat.version}</artifactId>
  <version>${spark.version}</version>
</dependency>
<!-- INTEGRATION WITH SCHEMA REGISTRY -->
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-schema-registry</artifactId>
  <version>${confluent.version}</version>
</dependency>

Solution 4

Another very simple alternative for pyspark (without full support for schema registry like schema registration, compatibility check, etc.) could be:

import requests

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.avro.functions import *

# variables
topic = "my-topic"
schemaregistry = "http://localhost:8081"
kafka_brokers = "kafka1:9092,kafka2:9092"

# retrieve the latest schema
response = requests.get('{}/subjects/{}-value/versions/latest/schema'.format(schemaregistry, topic))

# error check
response.raise_for_status()

# extract the schema from the response
schema = response.text

# run the query
query = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("subscribe", topic) \
    .load() \
    # The magic goes here:
    # Skip the first 5 bytes (reserved by schema registry encoding protocol)
    .selectExpr("substring(value, 6) as avro_value") \
    .select(from_avro(col("avro_value"), schema).alias("data")) \
    .select(col("data.my_field")) \
    .writeStream \
    .format("console") \
    .outputMode("complete") \
    .start()

Solution 5

This library will do the job for you. It connects to Confluent Schema Registry through Spark Structured Stream.

For Confluent, it copes with the schema id that is sent along with the payload.

In the README you will find a code snippet of how to do it.

DISCLOSURE: I work for ABSA and I developed this library.

Share:
24,147

Related videos on Youtube

Souhaib Guitouni
Author by

Souhaib Guitouni

Updated on July 24, 2021

Comments

  • Souhaib Guitouni
    Souhaib Guitouni almost 3 years

    I'm using a Kafka Source in Spark Structured Streaming to receive Confluent encoded Avro records. I intend to use Confluent Schema Registry, but the integration with spark structured streaming seems to be impossible.

    I have seen this question, but unable to get it working with the Confluent Schema Registry. Reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)

  • Jeremy
    Jeremy about 6 years
    Can you elaborate on the comment topic name is actually unused in the source code, just required by the signature. Weird right?
  • tstites
    tstites about 6 years
    It seems the signature for the deserialize method calls for a string, but it is unused in the function body. KafkaAvroDeserializer.java
  • Mikhail
    Mikhail almost 6 years
    description in this lib seems not correct for example in decripton there is 2.0.0 version but in maven i saw only 1.0.0
  • Mikhail
    Mikhail almost 6 years
    also i can not build the project. i have an error: [ERROR] E:\projects\dvsts\ABRiS\src\test\scala\za\co\absa\abris\avro‌​\read\confluent\Scal‌​aConfluentKafkaAvroD‌​eserializerSpec.scal‌​a:113: error: class MockedSchemaRegistryClient needs to be abstract, since: [ERROR] it has 8 unimplemented members.
  • Felipe Martins Melo
    Felipe Martins Melo almost 6 years
    @Mikhail, the new version was updated yesterday, and probably when you checked Maven Central it had not yet been synchronized. You can find it here: mvnrepository.com/artifact/za.co.absa/abris/2.0.0
  • OneCricketeer
    OneCricketeer over 5 years
    Would be nice to see an example usage here on this answer
  • Vibhor Nigam
    Vibhor Nigam about 5 years
    Hi, I am trying to implement the same code. I am getting an exception at keyDeserializer.deserialize(topicName, row.key, keySchema).toString , saying keySchema is org.apache.avro.Schema where as required is Array[Byte]. Checked the source code it looks like it expects Array[Byte] github.com/confluentinc/schema-registry/blob/master/…. Something i am missing here ?
  • Pyd
    Pyd about 5 years
    @cricket_007, does this library work with spark Java api, as I cannot able to get fromavro method after all the imports. could you please ?
  • OneCricketeer
    OneCricketeer about 5 years
    Only Databricks supports the registry, not Apache Spark itself
  • OneCricketeer
    OneCricketeer about 5 years
    @Vignesh I have not used it. But yes, any Scala library can be imported into Java code because it's all still running in the same JVM.
  • Karthikeyan
    Karthikeyan almost 5 years
    @tstites, im not able to find io.confluent.kafka.schemaregistry.client.rest.RestService this package in any confluent repositories, can you give location of this jar or mvn repository for this package?
  • OneCricketeer
    OneCricketeer almost 5 years
    @Karthikeyan github.com/confluentinc/schema-registry/blob/master/client/s‌​rc/… is part of io.confluent:kafka-schema-registry-client And the repo is here docs.confluent.io/current/clients/…
  • xav
    xav over 4 years
    Does Databricks support Schema Registry of Confluent? Or another type of schema registry. In case you can use Databricks, someone knows how to pass the schema registry credentials.I say this because the examples I find do not comment on it.
  • Leibnitz
    Leibnitz over 4 years
    Could you please explain how we can pass schema registry credentials in you program in case needed?
  • davidretana
    davidretana over 4 years
    I don't need to authenticate against schema registry, but i've found this information: (docs.confluent.io/current/schema-registry/security/index.ht‌​ml), and in this link you can configure Schema Registry authorization for communicating with the RBAC Kafka cluster. (docs.confluent.io/current/schema-registry/security/…)
  • Antón R. Yuste
    Antón R. Yuste about 4 years
    To pass schema registry credentials, see this answer: stackoverflow.com/a/58930199/6002794
  • Rafael Barros
    Rafael Barros almost 4 years
    for some reason the broadcast strings aren't working inside the map. Why?
  • Learnis
    Learnis almost 4 years
    It's not working in standlone cluster mode..throws Failed to execute user defined function(anonfun$consumeAvro$1: (binary) => string)
  • OneCricketeer
    OneCricketeer almost 4 years
    @xav Yes. Databricks partners with Confluent to support that Avro + Schema Registry functionality
  • OneCricketeer
    OneCricketeer almost 4 years
    I'm not sure you need to broadcast for each batch, also topic name isn't used by the deserialize, I believe
  • Learnis
    Learnis almost 4 years
    yeah @OneCricketeer. It's not working in yarn as well. Getting a null exception while deserializing line kafkaAvroDeserializer.deserialize(bytes). I try to handle with Try{}.getOrelse(), but if I do that I'm not getting data in data frame. Is it solvable ? any workaround for this ?
  • Learnis
    Learnis almost 4 years
    Will this work in standalone cluster or yarn mode ?
  • Learnis
    Learnis almost 4 years
    Or any working solutions in this stackoverflow post ? for cluster mode
  • OneCricketeer
    OneCricketeer almost 4 years
    @Learnis If you get an NPE in deserialize, then the bytes are probably null. I believe the deserializer is correctly initialized in the closure
  • scalacode
    scalacode almost 4 years
    @VibhorNigam, getting the same row.value error as you , did you fix it
  • Vibhor Nigam
    Vibhor Nigam almost 4 years
    @scalacode unfortunately no. i got another source with same data so moved on from this issue.
  • soMuchToLearnAndShare
    soMuchToLearnAndShare over 3 years
    Hi @OneCricketeer, what is the spark.version that you used? and confluent.version?
  • OneCricketeer
    OneCricketeer over 3 years
    @Minnie Probably 2.4.x and 5.x
  • soMuchToLearnAndShare
    soMuchToLearnAndShare over 3 years
    Hi timothyzhang, you did not need the UDF like @OneCricketeer did?
  • soMuchToLearnAndShare
    soMuchToLearnAndShare over 3 years
    Hi @OneCricketeer, why the key and value has a single quote in front of it? also, I had to add extra dependency ``` "org.apache.spark" %% "spark-sql"``` (sbt syntax), for the compiler to be happy for SparkSession etc. and then it has below error type mismatch; [error] found : Symbol [error] required: org.apache.spark.sql.Column [error] callUDF("deserialize", 'key).as("key"), // but sometimes they are avro
  • OneCricketeer
    OneCricketeer over 3 years
    @Minnie The single quote creates a Symbol object. The way to get a column would be Column("key") or $"key", but that was more typing
  • soMuchToLearnAndShare
    soMuchToLearnAndShare over 3 years
    Hi @OneCricketeer, when run in inteliJ, it has exception Exception in thread "main" java.net.ConnectException: Connection refused (Connection refused) ... ... at io.confluent.kafka.schemaregistry.client.rest.RestService.se‌​ndHttpRequest(RestSe‌​rvice.java:272). basically i could not connect to the SchemaRegistry. Note Kafka Confluent 5.5.1 is running. I tried this command nc -vz localhost 8081 to verify, and yes, it has error nc: connect to localhost port 8081 (tcp) failed: Connection refused. Any suggestions on this? Thanks a lot.
  • soMuchToLearnAndShare
    soMuchToLearnAndShare over 3 years
    Hi @timothyzhang, did you expereience this issue in your version test? stackoverflow.com/questions/63846392/…
  • OneCricketeer
    OneCricketeer over 3 years
    @Minnie Sound like you need to start a Schema Registry?
  • Venkat
    Venkat about 3 years
    #The magic goes here: this worked for me. But why do we need to skip first 5 bytes.
  • dudssource
    dudssource about 3 years
    Hi @Venkat, this is necessary because Confluent reserves this first bytes for it's internal wire format
  • Rafa
    Rafa almost 3 years
    How can we use this function in spark structured streaming , i am having spark 2.3.2 no from_avro and to_avro function available
  • OneCricketeer
    OneCricketeer over 2 years
    @Rafa Then you need to add databricks spark-avro library
  • Lars Skaug
    Lars Skaug about 2 years
    Thank you!!! I spent hours scratching my head and this simple solution really works.
  • Ivan
    Ivan almost 2 years
    This works!!! I spent several hours trying to solve this issue. Nice workaround.