Spark Streaming Exception: java.util.NoSuchElementException: None.get

10,832

The error is due to trying to run multiple spark contexts at the same time. Setting allowMultipleContexts to true is mostly used for testing purposes and it's use is discouraged. The solution to your problem is therefore to make sure that the same SparkContext is used everywhere. From the code we can see that the SparkContext (sc) is used to create a SQLContext which is fine. However, when creating the StreamingContext it is not used, instead the SparkConf is used.

By looking at the documentation we see:

Create a StreamingContext by providing the configuration necessary for a new SparkContext

In other words, by using SparkConf as parameter a new SparkContext will be created. Now there are two separate contexts.

The easiest solution here would be to continue using the same context as before. Change the line creating the StreamingContext to:

val ssc = new StreamingContext(sc, Seconds(20))

Note: In newer versions of Spark (2.0+) use SparkSession instead. A new streaming context can then be created using StreamingContext(spark.sparkContext, ...). It can look as follows:

val spark = SparkSession().builder
  .setMaster("local[*]")
  .setAppName("SparkKafka")
  .getOrCreate()

import sqlContext.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(20))
Share:
10,832

Related videos on Youtube

andani
Author by

andani

Success is no accident. It is hard work, perseverance, learning, studying, sacrifice and most of all, love of what you are doing or learning to do

Updated on June 04, 2022

Comments

  • andani
    andani almost 2 years

    I am writing SparkStreaming data to HDFS by converting it to a dataframe:

    Code

    object KafkaSparkHdfs {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkKafka")
      sparkConf.set("spark.driver.allowMultipleContexts", "true");
      val sc = new SparkContext(sparkConf)
    
      def main(args: Array[String]): Unit = {
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        import sqlContext.implicits._
    
        val ssc = new StreamingContext(sparkConf, Seconds(20))
    
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "localhost:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "stream3",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
    
        val topics = Array("fridaydata")
        val stream = KafkaUtils.createDirectStream[String, String](
          ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)
        )
    
        val lines = stream.map(consumerRecord => consumerRecord.value)
        val words = lines.flatMap(_.split(" "))
        val wordMap = words.map(word => (word, 1))
        val wordCount = wordMap.reduceByKey(_ + _)
    
        wordCount.foreachRDD(rdd => {
          val dataframe = rdd.toDF(); 
          dataframe.write
            .mode(SaveMode.Append)
            .save("hdfs://localhost:9000/newfile24")     
        })
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    The folder is created but the file is not written.

    The program is getting terminated with the following error:

        18/06/22 16:14:41 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
        java.util.NoSuchElementException: None.get
        at scala.None$.get(Option.scala:347)
        at scala.None$.get(Option.scala:345)
        at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
        at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
        at java.lang.Thread.run(Thread.java:748)
        18/06/22 16:14:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.util.NoSuchElementException: None.get
        at scala.None$.get(Option.scala:347)
        at scala.None$.get(Option.scala:345)
        at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
        at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    

    In my pom I am using respective dependencies:

    • spark-core_2.11
    • spark-sql_2.11
    • spark-streaming_2.11
    • spark-streaming-kafka-0-10_2.11
  • andani
    andani almost 6 years
    i have 2 datanodes, and my data is just 1mb file @user9985951