KafkaUtils class not found in Spark streaming
Solution 1
spark-submit does not automatically put the package containing KafkaUtils. You need to have in your project JAR. For that you need to create an all inclusive uber-jar, using sbt assembly. Here is an example build.sbt .
https://github.com/tdas/spark-streaming-external-projects/blob/master/kafka/build.sbt
You obviously also need to add the assembly plugin to SBT.
https://github.com/tdas/spark-streaming-external-projects/tree/master/kafka/project
Solution 2
Please try by including all dependency jars while submitting application:
./spark-submit --name "SampleApp" --deploy-mode client--master spark://host:7077 --class com.stackexchange.SampleApp --jars $SPARK_INSTALL_DIR/spark-streaming-kafka_2.10-1.3.0.jar,$KAFKA_INSTALL_DIR/libs/kafka_2.10-0.8.2.0.jar,$KAFKA_INSTALL_DIR/libs/metrics-core-2.2.0.jar,$KAFKA_INSTALL_DIR/libs/zkclient-0.3.jar spark-example-1.0-SNAPSHOT.jar
Solution 3
Following build.sbt
worked for me. It requires you to also put the sbt-assembly
plugin in a file under the projects/
directory.
build.sbt
name := "NetworkStreaming" // https://github.com/sbt/sbt-assembly/blob/master/Migration.md#upgrading-with-bare-buildsbt
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-streaming_2.10" % "1.4.1",
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.4.1", // kafka
"org.apache.hbase" % "hbase" % "0.92.1",
"org.apache.hadoop" % "hadoop-core" % "1.0.2",
"org.apache.spark" % "spark-mllib_2.10" % "1.3.0"
)
mergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
case "log4j.properties" => MergeStrategy.discard
case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
}
project/plugins.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")
kahlo
Data scientist specialized in building data products that create competitive advantages to organizations.
Updated on May 30, 2020Comments
-
kahlo almost 4 years
I have just began with Spark Streaming and I am trying to build a sample application that counts words from a Kafka stream. Although it compiles with
sbt package
, when I run it, I getNoClassDefFoundError
. This post seems to have the same problem, but the solution is for Maven and I have not been able to reproduce it with sbt.KafkaApp.scala
:import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ object KafkaApp { def main(args: Array[String]) { val conf = new SparkConf().setAppName("kafkaApp").setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(1)) val kafkaParams = Map( "zookeeper.connect" -> "localhost:2181", "zookeeper.connection.timeout.ms" -> "10000", "group.id" -> "sparkGroup" ) val topics = Map( "test" -> 1 ) // stream of (topic, ImpressionLog) val messages = KafkaUtils.createStream(ssc, kafkaParams, topics, storage.StorageLevel.MEMORY_AND_DISK) println(s"Number of words: %{messages.count()}") } }
build.sbt
:name := "Simple Project" version := "1.1" scalaVersion := "2.10.4" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.1.1", "org.apache.spark" %% "spark-streaming" % "1.1.1", "org.apache.spark" %% "spark-streaming-kafka" % "1.1.1" ) resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
And I submit it with:
bin/spark-submit \ --class "KafkaApp" \ --master local[4] \ target/scala-2.10/simple-project_2.10-1.1.jar
Error:
14/12/30 19:44:57 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://[email protected]:65077/user/HeartbeatReceiver Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ at KafkaApp$.main(KafkaApp.scala:28) at KafkaApp.main(KafkaApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$ at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
-
Shrinivas Kulkarni over 8 yearsI am also getting same issue while I am using Maven. After that I included "org.apache.maven.plugins" in my pom.xml but the issue is unsolved. Any other parameter I have to check?
-
johnsam about 8 yearswith the change, if I run stb package, I got error. : error: not found: object AssemblyKeys import AssemblyKeys._ ^ [error] Type error in expression
-
jurgispods almost 8 years@johnsam Just leave away the first import line and the "assemblySettings" line, works for me.