Why does Spark application fail with “ClassNotFoundException: Failed to find data source: kafka” as uber-jar with sbt assembly?

33,682

Solution 1

I tried like this it's working for me. Submit like this and let me know once you have any issues

./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 --class com.inndata.StructuredStreaming.Kafka --master local[*] /Users/apple/.m2/repository/com/inndata/StructuredStreaming/0.0.1SNAPSHOT/StructuredStreaming-0.0.1-SNAPSHOT.jar

Solution 2

The issue is the following section in build.sbt:

// META-INF discarding
assemblyMergeStrategy in assembly := { 
   {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
   }
}

It says that all META-INF entires should be discarded, including the "code" that makes data source aliases (e.g. kafka) work.

But the META-INF files are very important for kafka (and other aliases of streaming data sources) to work.

For kafka alias to work Spark SQL uses META-INF/services/org.apache.spark.sql.sources.DataSourceRegister with the following entry:

org.apache.spark.sql.kafka010.KafkaSourceProvider

KafkaSourceProvider is responsible to register kafka alias with the proper streaming data source, i.e. KafkaSource.

Just to check that the real code is indeed available, but the "code" that makes the alias registered is not, you could use the kafka data source by the fully-qualified name (not the alias) as follows:

spark.readStream.
  format("org.apache.spark.sql.kafka010.KafkaSourceProvider").
  load

You will see other problems due to missing options like kafka.bootstrap.servers, but...we're digressing.

A solution is to MergeStrategy.concat all META-INF/services/org.apache.spark.sql.sources.DataSourceRegister (that would create an uber-jar with all data sources, incl. the kafka data source).

case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat

Solution 3

This is in view of Jacek Laskowski's answer.

Those of you building your project on maven can try this out. Add the line mentioned below to your maven-shade-plugin.

META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

I've put down the plugin code for the pom file as an example to show where to add the line.


<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>3.1.0</version>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>
                            META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
                        </resource>
                    </transformer>
                </transformers>
                <finalName>${project.artifactId}-${project.version}-uber</finalName>
            </configuration>
        </execution>
    </executions>
</plugin>

Please excuse my formatting skills.

Solution 4

In my case I also got this error while compiling with sbt, and the cause was that sbt assembly was not including the spark-sql-kafka-0-10_2.11 artifact as part of the fat jar.

(I would be very welcome to comments here. The dependency was not specified a scope, so it should not be assumed to be "provided").

So I changed to deploying a normal (slim) jar and including the dependencies with the --jars parameters to spark-submit.

In order to gather all dependencies in one place, you can add retrieveManaged := true to your sbt project settings, or you can, in the sbt console, issue:

> set retrieveManaged := true
> package

That should bring all dependencies to the lib_managed folder.

Then you can copy all those files (with a bash command you can for example use something like this

cd /path/to/your/project

JARLIST=$(find lib_managed -name '*.jar'| paste -sd , -)

spark-submit [other-args] target/your-app-1.0-SNAPSHOT.jar --jars "$JARLIST"

Solution 5

I am using gradle as a build tool and the shadowJar plugin to create the uberJar. The solution was simply to add a File

src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister  

to the project.

In this file you need to put, line by line, the class names of the DataSources you use, in this case it would be org.apache.spark.sql.kafka010.KafkaSourceProvider (find that class name for example here)

The reason is that Spark uses the java ServiceLoader in it's internal dependency management mechanisms.

Full example here.

Share:
33,682
benjguin
Author by

benjguin

Updated on July 09, 2022

Comments

  • benjguin
    benjguin almost 2 years

    I'm trying to run a sample like StructuredKafkaWordCount. I started with the Spark Structured Streaming Programming guide.

    My code is

    package io.boontadata.spark.job1
    
    import org.apache.spark.sql.SparkSession
    
    object DirectKafkaAggregateEvents {
      val FIELD_MESSAGE_ID = 0
      val FIELD_DEVICE_ID = 1
      val FIELD_TIMESTAMP = 2
      val FIELD_CATEGORY = 3
      val FIELD_MEASURE1 = 4
      val FIELD_MEASURE2 = 5
    
      def main(args: Array[String]) {
        if (args.length < 3) {
          System.err.println(s"""
            |Usage: DirectKafkaAggregateEvents <brokers> <subscribeType> <topics>
            |  <brokers> is a list of one or more Kafka brokers
            |  <subscribeType> sample value: subscribe
            |  <topics> is a list of one or more kafka topics to consume from
            |
            """.stripMargin)
          System.exit(1)
        }
    
        val Array(bootstrapServers, subscribeType, topics) = args
    
        val spark = SparkSession
          .builder
          .appName("boontadata-spark-job1")
          .getOrCreate()
    
        import spark.implicits._
    
        // Create DataSet representing the stream of input lines from kafka
        val lines = spark
          .readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", bootstrapServers)
          .option(subscribeType, topics)
          .load()
          .selectExpr("CAST(value AS STRING)")
          .as[String]
    
        // Generate running word count
        val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
    
        // Start running the query that prints the running counts to the console
        val query = wordCounts.writeStream
          .outputMode("complete")
          .format("console")
          .start()
    
        query.awaitTermination()
      }
    
    }
    

    I added the following sbt files:

    build.sbt:

    name := "boontadata-spark-job1"
    version := "0.1"
    scalaVersion := "2.11.7"
    
    libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.2" % "provided"
    libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.2" % "provided"
    libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.0.2" % "provided"
    libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2"
    libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.0.2"
    libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.1.1"
    libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.10.1.1"
    
    // META-INF discarding
    assemblyMergeStrategy in assembly := { 
       {
        case PathList("META-INF", xs @ _*) => MergeStrategy.discard
        case x => MergeStrategy.first
       }
    }
    

    I also added project/assembly.sbt

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
    

    This creates a Uber jar with the non provided jars.

    I submit with the following line:

    spark-submit boontadata-spark-job1-assembly-0.1.jar ks1:9092,ks2:9092,ks3:9092 subscribe sampletopic
    

    but I get this runtime error:

    Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects
            at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148)
            at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79)
            at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79)
            at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218)
            at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80)
            at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80)
            at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
            at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
            at io.boontadata.spark.job1.DirectKafkaAggregateEvents$.main(StreamingJob.scala:41)
            at io.boontadata.spark.job1.DirectKafkaAggregateEvents.main(StreamingJob.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
            at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
            at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
            at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
            at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
            at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
            at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
            at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
            at scala.util.Try$.apply(Try.scala:192)
            at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
            at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
            at scala.util.Try.orElse(Try.scala:84)
            at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:132)
            ... 18 more
    16/12/23 13:32:48 INFO spark.SparkContext: Invoking stop() from shutdown hook
    

    Is there a way to know which class is not found so that I can search the maven.org repo for that class.

    The lookupDataSource source code seems to be at line 543 at https://github.com/apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala but I couldn't find a direct link with Kafka data source...

    Complete source code is here: https://github.com/boontadata/boontadata-streams/tree/ad0d0134ddb7664d359c8dca40f1d16ddd94053f

  • ssice
    ssice about 7 years
    Yes, this happened on Spark 2.1.
  • teddy
    teddy over 6 years
    Hi, the weird thing is in benjguin's sbt "libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2"" has already been there. Why you need to manually add "--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0"? I know it works, but I just don't know why it does not work without that.
  • Jacek Laskowski
    Jacek Laskowski over 6 years
    I'd call it a workaround and does not explain why the uber-jar does not work. The root cause is assemblyMergeStrategy in build.sbt that discards all META-INF files incl. registrations. See my answer below.
  • MaatDeamon
    MaatDeamon over 5 years
    Actually sounds like I managed to make it work with your suggestion. Just had to order the "case" in the merge strategy and put that at the top.
  • JMess
    JMess about 5 years
    The docs aren't clear on which dependencies should be marked provided. Seems spark-sql-kafka is not one of them. Remove the provided tag from pom.xml/build.sbt and it works
  • Sparker0i
    Sparker0i almost 5 years
    Should I still retain the case PathList("META-INF" , ...) after applying case "META-INF/services/org.apache.spark.sql.sources.DataSourceRe‌​gister" => MergeStrategy.concat?
  • Jacek Laskowski
    Jacek Laskowski almost 5 years
    They're different patterns so it depends on your app actually.
  • Piyush Kumar
    Piyush Kumar about 4 years
    I was able to run just by replacing kafka by org.apache.spark.sql.kafka010.KafkaSourceProvider along with making an uber jar with maven shade plugin . Thanks a lot @JacekLaskowski
  • Jacek Laskowski
    Jacek Laskowski about 4 years
    Great! You don't need "replacing kafka by org.apache.spark.sql.kafka010.KafkaSourceProvider". Switch it back.