Why does Spark application fail with “ClassNotFoundException: Failed to find data source: kafka” as uber-jar with sbt assembly?
Solution 1
I tried like this it's working for me. Submit like this and let me know once you have any issues
./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 --class com.inndata.StructuredStreaming.Kafka --master local[*] /Users/apple/.m2/repository/com/inndata/StructuredStreaming/0.0.1SNAPSHOT/StructuredStreaming-0.0.1-SNAPSHOT.jar
Solution 2
The issue is the following section in build.sbt
:
// META-INF discarding
assemblyMergeStrategy in assembly := {
{
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
}
It says that all META-INF
entires should be discarded, including the "code" that makes data source aliases (e.g. kafka
) work.
But the META-INF
files are very important for kafka
(and other aliases of streaming data sources) to work.
For kafka
alias to work Spark SQL uses META-INF/services/org.apache.spark.sql.sources.DataSourceRegister with the following entry:
org.apache.spark.sql.kafka010.KafkaSourceProvider
KafkaSourceProvider
is responsible to register kafka
alias with the proper streaming data source, i.e. KafkaSource.
Just to check that the real code is indeed available, but the "code" that makes the alias registered is not, you could use the kafka
data source by the fully-qualified name (not the alias) as follows:
spark.readStream.
format("org.apache.spark.sql.kafka010.KafkaSourceProvider").
load
You will see other problems due to missing options like kafka.bootstrap.servers
, but...we're digressing.
A solution is to MergeStrategy.concat
all META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
(that would create an uber-jar with all data sources, incl. the kafka
data source).
case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
Solution 3
This is in view of Jacek Laskowski's answer.
Those of you building your project on maven can try this out. Add the line mentioned below to your maven-shade-plugin.
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
I've put down the plugin code for the pom file as an example to show where to add the line.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
</resource>
</transformer>
</transformers>
<finalName>${project.artifactId}-${project.version}-uber</finalName>
</configuration>
</execution>
</executions>
</plugin>
Please excuse my formatting skills.
Solution 4
In my case I also got this error while compiling with sbt, and the cause was that sbt assembly
was not including the spark-sql-kafka-0-10_2.11
artifact as part of the fat jar.
(I would be very welcome to comments here. The dependency was not specified a scope, so it should not be assumed to be "provided").
So I changed to deploying a normal (slim) jar and including the dependencies with the --jars
parameters to spark-submit.
In order to gather all dependencies in one place, you can add retrieveManaged := true
to your sbt project settings, or you can, in the sbt console, issue:
> set retrieveManaged := true
> package
That should bring all dependencies to the lib_managed
folder.
Then you can copy all those files (with a bash command you can for example use something like this
cd /path/to/your/project
JARLIST=$(find lib_managed -name '*.jar'| paste -sd , -)
spark-submit [other-args] target/your-app-1.0-SNAPSHOT.jar --jars "$JARLIST"
Solution 5
I am using gradle as a build tool and the shadowJar plugin to create the uberJar. The solution was simply to add a File
src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
to the project.
In this file you need to put, line by line, the class names of the DataSources you use, in this case it would be org.apache.spark.sql.kafka010.KafkaSourceProvider
(find that class name for example here)
The reason is that Spark uses the java ServiceLoader in it's internal dependency management mechanisms.
Full example here.
benjguin
Updated on July 09, 2022Comments
-
benjguin almost 2 years
I'm trying to run a sample like StructuredKafkaWordCount. I started with the Spark Structured Streaming Programming guide.
My code is
package io.boontadata.spark.job1 import org.apache.spark.sql.SparkSession object DirectKafkaAggregateEvents { val FIELD_MESSAGE_ID = 0 val FIELD_DEVICE_ID = 1 val FIELD_TIMESTAMP = 2 val FIELD_CATEGORY = 3 val FIELD_MEASURE1 = 4 val FIELD_MEASURE2 = 5 def main(args: Array[String]) { if (args.length < 3) { System.err.println(s""" |Usage: DirectKafkaAggregateEvents <brokers> <subscribeType> <topics> | <brokers> is a list of one or more Kafka brokers | <subscribeType> sample value: subscribe | <topics> is a list of one or more kafka topics to consume from | """.stripMargin) System.exit(1) } val Array(bootstrapServers, subscribeType, topics) = args val spark = SparkSession .builder .appName("boontadata-spark-job1") .getOrCreate() import spark.implicits._ // Create DataSet representing the stream of input lines from kafka val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option(subscribeType, topics) .load() .selectExpr("CAST(value AS STRING)") .as[String] // Generate running word count val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count() // Start running the query that prints the running counts to the console val query = wordCounts.writeStream .outputMode("complete") .format("console") .start() query.awaitTermination() } }
I added the following sbt files:
build.sbt:
name := "boontadata-spark-job1" version := "0.1" scalaVersion := "2.11.7" libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.2" % "provided" libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.2" % "provided" libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.0.2" % "provided" libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2" libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.0.2" libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.1.1" libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.10.1.1" // META-INF discarding assemblyMergeStrategy in assembly := { { case PathList("META-INF", xs @ _*) => MergeStrategy.discard case x => MergeStrategy.first } }
I also added project/assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
This creates a Uber jar with the non
provided
jars.I submit with the following line:
spark-submit boontadata-spark-job1-assembly-0.1.jar ks1:9092,ks2:9092,ks3:9092 subscribe sampletopic
but I get this runtime error:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148) at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79) at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124) at io.boontadata.spark.job1.DirectKafkaAggregateEvents$.main(StreamingJob.scala:41) at io.boontadata.spark.job1.DirectKafkaAggregateEvents.main(StreamingJob.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132) at scala.util.Try.orElse(Try.scala:84) at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:132) ... 18 more 16/12/23 13:32:48 INFO spark.SparkContext: Invoking stop() from shutdown hook
Is there a way to know which class is not found so that I can search the maven.org repo for that class.
The
lookupDataSource
source code seems to be at line 543 at https://github.com/apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala but I couldn't find a direct link with Kafka data source...Complete source code is here: https://github.com/boontadata/boontadata-streams/tree/ad0d0134ddb7664d359c8dca40f1d16ddd94053f
-
benjguin over 7 yearscomplete source code is here: <github.com/boontadata/boontadata-streams/tree/…>
-
benjguin over 6 yearsHi @jithinpt, please see comments in the answer marked as THE answer.
-
-
ssice about 7 yearsYes, this happened on Spark 2.1.
-
teddy over 6 yearsHi, the weird thing is in benjguin's sbt "libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2"" has already been there. Why you need to manually add "--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0"? I know it works, but I just don't know why it does not work without that.
-
Jacek Laskowski over 6 yearsI'd call it a workaround and does not explain why the uber-jar does not work. The root cause is
assemblyMergeStrategy
inbuild.sbt
that discards allMETA-INF
files incl. registrations. See my answer below. -
MaatDeamon over 5 yearsActually sounds like I managed to make it work with your suggestion. Just had to order the "case" in the merge strategy and put that at the top.
-
JMess about 5 yearsThe docs aren't clear on which dependencies should be marked provided. Seems spark-sql-kafka is not one of them. Remove the provided tag from pom.xml/build.sbt and it works
-
Sparker0i almost 5 yearsShould I still retain the case
PathList("META-INF" , ...)
after applyingcase "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
? -
Jacek Laskowski almost 5 yearsThey're different patterns so it depends on your app actually.
-
Piyush Kumar about 4 yearsI was able to run just by replacing kafka by org.apache.spark.sql.kafka010.KafkaSourceProvider along with making an uber jar with maven shade plugin . Thanks a lot @JacekLaskowski
-
Jacek Laskowski about 4 yearsGreat! You don't need "replacing kafka by org.apache.spark.sql.kafka010.KafkaSourceProvider". Switch it back.