"sparkContext was shut down" while running spark on a large dataset

40,231

Solution 1

Found the answer.

The my table was saved as a 20gb avro file. When executors tried to open it. Each of them had to load 20gb into memory. Solved it by using csv instead of avro

Solution 2

Symptoms are typical of a OutOfMemory error in one the executor tasks. Try augmenting memory for executor when lauching job. See parameter --executor-memory of spark-submit, spark-shell etc. Default value is 1G

Solution 3

Another possible cause of the "SparkContext is shutdown" error is that you are importing a jar file after evaluating some other code. (This may only happen in Spark Notebook.)

To fix the problem, move all your :cp myjar.jar statements to the start of your file.

Share:
40,231
Aleksander Zendel
Author by

Aleksander Zendel

Updated on December 08, 2020

Comments

  • Aleksander Zendel
    Aleksander Zendel over 3 years

    When running sparkJob on a cluster past a certain data size(~2,5gb) I am getting either "Job cancelled because SparkContext was shut down" or "executor lost". When looking at yarn gui I see that job that got killed was successful. There are no problems when running on data that is 500mb. I was looking for a solution and found that: - "seems yarn kills some of the executors as they request more memory than expected."

    Any suggestions how to debug it?

    command that I submit my spark job with:

    /opt/spark-1.5.0-bin-hadoop2.4/bin/spark-submit  --driver-memory 22g --driver-cores 4 --num-executors 15 --executor-memory 6g --executor-cores 6  --class sparkTesting.Runner   --master yarn-client myJar.jar jarArguments
    

    and sparkContext settings

    val sparkConf = (new SparkConf()
        .set("spark.driver.maxResultSize", "21g")
        .set("spark.akka.frameSize", "2011")
        .set("spark.eventLog.enabled", "true")
        .set("spark.eventLog.enabled", "true")
        .set("spark.eventLog.dir", configVar.sparkLogDir)
        )
    

    Simplified code that fails looks like that

     val hc = new org.apache.spark.sql.hive.HiveContext(sc)
    val broadcastParser = sc.broadcast(new Parser())
    
    val featuresRdd = hc.sql("select "+ configVar.columnName + " from " + configVar.Table +" ORDER BY RAND() LIMIT " + configVar.Articles)
    val myRdd : org.apache.spark.rdd.RDD[String] = featuresRdd.map(doSomething(_,broadcastParser))
    
    val allWords= featuresRdd
      .flatMap(line => line.split(" "))
      .count
    
    val wordQuantiles= featuresRdd
      .flatMap(line => line.split(" "))
      .map(word => (word, 1))
      .reduceByKey(_ + _)
      .map(pair => (pair._2 , pair._2))
      .reduceByKey(_+_)
      .sortBy(_._1)
      .collect
      .scanLeft((0,0.0)) ( (res,add) => (add._1, res._2+add._2) )
      .map(entry => (entry._1,entry._2/allWords))
    
    val dictionary = featuresRdd
      .flatMap(line => line.split(" "))
      .map(word => (word, 1))
      .reduceByKey(_ + _) // here I have Rdd of word,count tuples
      .filter(_._2 >= moreThan)
      .filter(_._2 <= lessThan)
      .filter(_._1.trim!=(""))
      .map(_._1)
      .zipWithIndex
      .collect
      .toMap
    

    And Error stack

    Exception in thread "main" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:703)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:702)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1511)
    at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
    at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1435)
    at org.apache.spark.SparkContext$$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1715)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:1714)
    at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:146)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1121)
    at sparkTesting.InputGenerationAndDictionaryComputations$.createDictionary(InputGenerationAndDictionaryComputations.scala:50)
    at sparkTesting.Runner$.main(Runner.scala:133)
    at sparkTesting.Runner.main(Runner.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    
  • Josiah Yoder
    Josiah Yoder almost 4 years
    Please suggest ways to improve this post before downvoting it. Thank you!