KafkaUtils class not found in Spark streaming

22,529

Solution 1

spark-submit does not automatically put the package containing KafkaUtils. You need to have in your project JAR. For that you need to create an all inclusive uber-jar, using sbt assembly. Here is an example build.sbt .

https://github.com/tdas/spark-streaming-external-projects/blob/master/kafka/build.sbt

You obviously also need to add the assembly plugin to SBT.

https://github.com/tdas/spark-streaming-external-projects/tree/master/kafka/project

Solution 2

Please try by including all dependency jars while submitting application:

./spark-submit --name "SampleApp" --deploy-mode client--master spark://host:7077 --class com.stackexchange.SampleApp --jars $SPARK_INSTALL_DIR/spark-streaming-kafka_2.10-1.3.0.jar,$KAFKA_INSTALL_DIR/libs/kafka_2.10-0.8.2.0.jar,$KAFKA_INSTALL_DIR/libs/metrics-core-2.2.0.jar,$KAFKA_INSTALL_DIR/libs/zkclient-0.3.jar spark-example-1.0-SNAPSHOT.jar

Solution 3

Following build.sbt worked for me. It requires you to also put the sbt-assembly plugin in a file under the projects/ directory.

build.sbt

name := "NetworkStreaming" // https://github.com/sbt/sbt-assembly/blob/master/Migration.md#upgrading-with-bare-buildsbt

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming_2.10" % "1.4.1",
  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.4.1",         // kafka
  "org.apache.hbase" % "hbase" % "0.92.1",
  "org.apache.hadoop" % "hadoop-core" % "1.0.2",
  "org.apache.spark" % "spark-mllib_2.10" % "1.3.0"
)

mergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf")          => MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$")      => MergeStrategy.discard
  case "log4j.properties"                                  => MergeStrategy.discard
  case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines
  case "reference.conf"                                    => MergeStrategy.concat
  case _                                                   => MergeStrategy.first
}

project/plugins.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

Share:
22,529
kahlo
Author by

kahlo

Data scientist specialized in building data products that create competitive advantages to organizations.

Updated on May 30, 2020

Comments

  • kahlo
    kahlo almost 4 years

    I have just began with Spark Streaming and I am trying to build a sample application that counts words from a Kafka stream. Although it compiles with sbt package, when I run it, I get NoClassDefFoundError. This post seems to have the same problem, but the solution is for Maven and I have not been able to reproduce it with sbt.

    KafkaApp.scala:

    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    
    object KafkaApp {
      def main(args: Array[String]) {
    
        val conf = new SparkConf().setAppName("kafkaApp").setMaster("local[*]")
        val ssc = new StreamingContext(conf, Seconds(1))
        val kafkaParams = Map(
            "zookeeper.connect" -> "localhost:2181",
            "zookeeper.connection.timeout.ms" -> "10000",
            "group.id" -> "sparkGroup"
        )
    
        val topics = Map(
            "test" -> 1
        )
    
        // stream of (topic, ImpressionLog)
        val messages = KafkaUtils.createStream(ssc, kafkaParams, topics, storage.StorageLevel.MEMORY_AND_DISK)
        println(s"Number of words: %{messages.count()}")
      }
    }
    

    build.sbt:

    name := "Simple Project"
    
    version := "1.1"
    
    scalaVersion := "2.10.4"
    
    libraryDependencies ++= Seq(
        "org.apache.spark" %% "spark-core" % "1.1.1",
        "org.apache.spark" %% "spark-streaming" % "1.1.1",
        "org.apache.spark" %% "spark-streaming-kafka" % "1.1.1"
    )
    
    resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
    

    And I submit it with:

    bin/spark-submit \
      --class "KafkaApp" \
      --master local[4] \
      target/scala-2.10/simple-project_2.10-1.1.jar
    

    Error:

    14/12/30 19:44:57 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://[email protected]:65077/user/HeartbeatReceiver
    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$
        at KafkaApp$.main(KafkaApp.scala:28)
        at KafkaApp.main(KafkaApp.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    
  • Shrinivas Kulkarni
    Shrinivas Kulkarni over 8 years
    I am also getting same issue while I am using Maven. After that I included "org.apache.maven.plugins" in my pom.xml but the issue is unsolved. Any other parameter I have to check?
  • johnsam
    johnsam about 8 years
    with the change, if I run stb package, I got error. : error: not found: object AssemblyKeys import AssemblyKeys._ ^ [error] Type error in expression
  • jurgispods
    jurgispods almost 8 years
    @johnsam Just leave away the first import line and the "assemblySettings" line, works for me.