Spark Streaming Exception: java.util.NoSuchElementException: None.get
The error is due to trying to run multiple spark contexts at the same time. Setting allowMultipleContexts
to true is mostly used for testing purposes and it's use is discouraged. The solution to your problem is therefore to make sure that the same SparkContext
is used everywhere. From the code we can see that the SparkContext
(sc
) is used to create a SQLContext
which is fine. However, when creating the StreamingContext
it is not used, instead the SparkConf
is used.
By looking at the documentation we see:
Create a StreamingContext by providing the configuration necessary for a new SparkContext
In other words, by using SparkConf
as parameter a new SparkContext
will be created. Now there are two separate contexts.
The easiest solution here would be to continue using the same context as before. Change the line creating the StreamingContext
to:
val ssc = new StreamingContext(sc, Seconds(20))
Note: In newer versions of Spark (2.0+) use SparkSession
instead. A new streaming context can then be created using StreamingContext(spark.sparkContext, ...)
. It can look as follows:
val spark = SparkSession().builder
.setMaster("local[*]")
.setAppName("SparkKafka")
.getOrCreate()
import sqlContext.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(20))
Related videos on Youtube
andani
Success is no accident. It is hard work, perseverance, learning, studying, sacrifice and most of all, love of what you are doing or learning to do
Updated on June 04, 2022Comments
-
andani almost 2 years
I am writing
SparkStreaming
data to HDFS by converting it to a dataframe:Code
object KafkaSparkHdfs { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkKafka") sparkConf.set("spark.driver.allowMultipleContexts", "true"); val sc = new SparkContext(sparkConf) def main(args: Array[String]): Unit = { val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val ssc = new StreamingContext(sparkConf, Seconds(20)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "localhost:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "stream3", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("fridaydata") val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) val lines = stream.map(consumerRecord => consumerRecord.value) val words = lines.flatMap(_.split(" ")) val wordMap = words.map(word => (word, 1)) val wordCount = wordMap.reduceByKey(_ + _) wordCount.foreachRDD(rdd => { val dataframe = rdd.toDF(); dataframe.write .mode(SaveMode.Append) .save("hdfs://localhost:9000/newfile24") }) ssc.start() ssc.awaitTermination() } }
The folder is created but the file is not written.
The program is getting terminated with the following error:
18/06/22 16:14:41 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347) at scala.None$.get(Option.scala:345) at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343) at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289) at java.lang.Thread.run(Thread.java:748) 18/06/22 16:14:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347) at scala.None$.get(Option.scala:345) at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343) at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
In my pom I am using respective dependencies:
- spark-core_2.11
- spark-sql_2.11
- spark-streaming_2.11
- spark-streaming-kafka-0-10_2.11
-
andani almost 6 yearsi have 2 datanodes, and my data is just 1mb file @user9985951