How to fix "MetadataFetchFailedException: Missing an output location for shuffle"?

20,149

MetadataFetchFailedException is thrown when a MapOutputTracker on an executor could not find requested shuffle map outputs for partitions in local cache and tried to fetch them remotely from the driver's MapOutputTracker.

That could lead to few conclusions:

  1. The driver's memory issues
  2. The executors' memory issues
  3. Executors being lost

Please review the logs looking for issues reported as "Executor lost" INFO messages and/or review web UI's Executors page and see how the executors work.

The root cause of executors being lost may also be that the cluster manager has decided to kill ill-behaved executors (that may have used up more memory than requested).

See the other question FetchFailedException or MetadataFetchFailedException when processing big data set for more insights.

Share:
20,149
Stefan Falk
Author by

Stefan Falk

#EverythingIsBrokenAllTheTime #Workflow

Updated on February 08, 2022

Comments

  • Stefan Falk
    Stefan Falk about 2 years

    If I increase the model size of my word2vec model I start to get this kind of exception in my log:

    org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 6
        at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:542)
        at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:538)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:538)
        at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:155)
        at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:47)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96)
        at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    

    I tried to write my own "save model" version which looks like this:

      def save(model: Word2VecModel, sc: SparkContext, path: String): Unit = {
    
        println("Saving model as CSV ..")
    
        val vectorSize = model.getVectors.values.head.size
    
        println("vectorSize="+vectorSize)
    
        val SEPARATOR_TOKEN = " "    
        val dataArray = model.getVectors.toSeq.map { case (w, v) => Data(w, v) }
    
        println("Got dataArray ..")
        println("parallelize(dataArray, 10)")
        val par = sc.parallelize(dataArray, 10)
              .map(d => {
    
                val sb = new mutable.StringBuilder()
                sb.append(d.word)
                sb.append(SEPARATOR_TOKEN)
    
                for(v <- d.vector) {
                  sb.append(v)
                  sb.append(SEPARATOR_TOKEN)
                }
                sb.setLength(sb.length - 1)
                sb.append("\n")
                sb.toString()
              })
        println("repartition(1)")
        val rep = par.repartition(1)
        println("collect()")
        val vectorsAsString = rep.collect()
    
        println("Collected serialized vectors ..")    
    
        val cfile = new mutable.StringBuilder()
    
        cfile.append(vectorsAsString.length)
        cfile.append(" ")
        cfile.append(vectorSize)
        cfile.append("\n")
    
        val sb = new StringBuilder
        sb.append("word,")
        for(i <- 0 until vectorSize) {
          sb.append("v")
          sb.append(i.toString)
          sb.append(",")
        }
        sb.setLength(sb.length - 1)
        sb.append("\n")
    
        for(vectorString <- vectorsAsString) {
          sb.append(vectorString)
          cfile.append(vectorString)
        }
    
        println("Saving file to " + new Path(path, "data").toUri.toString)
        sc.parallelize(sb.toString().split("\n"), 1).saveAsTextFile(new Path(path+".csv", "data").toUri.toString)
        sc.parallelize(cfile.toString().split("\n"), 1).saveAsTextFile(new Path(path+".cs", "data").toUri.toString)
      }
    

    Apparently it's working similar to their current implementation - it doesn't.

    I'd like to get a word2vec model. It works with small files but not if the model gets larger.

  • Sushil Verma
    Sushil Verma almost 2 years
    repartition(10) worked for me
  • Jacek Laskowski
    Jacek Laskowski almost 2 years
    @SushilVerma Can you show the query you used. Did repartition(10) help before or after groupBy? How do you submit your Spark app?