How to fix "MetadataFetchFailedException: Missing an output location for shuffle"?
MetadataFetchFailedException
is thrown when a MapOutputTracker
on an executor could not find requested shuffle map outputs for partitions in local cache and tried to fetch them remotely from the driver's MapOutputTracker
.
That could lead to few conclusions:
- The driver's memory issues
- The executors' memory issues
- Executors being lost
Please review the logs looking for issues reported as "Executor lost" INFO messages and/or review web UI's Executors page and see how the executors work.
The root cause of executors being lost may also be that the cluster manager has decided to kill ill-behaved executors (that may have used up more memory than requested).
See the other question FetchFailedException or MetadataFetchFailedException when processing big data set for more insights.
Comments
-
Stefan Falk about 2 years
If I increase the model size of my word2vec model I start to get this kind of exception in my log:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 6 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:542) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:538) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:538) at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:155) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:47) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
I tried to write my own "save model" version which looks like this:
def save(model: Word2VecModel, sc: SparkContext, path: String): Unit = { println("Saving model as CSV ..") val vectorSize = model.getVectors.values.head.size println("vectorSize="+vectorSize) val SEPARATOR_TOKEN = " " val dataArray = model.getVectors.toSeq.map { case (w, v) => Data(w, v) } println("Got dataArray ..") println("parallelize(dataArray, 10)") val par = sc.parallelize(dataArray, 10) .map(d => { val sb = new mutable.StringBuilder() sb.append(d.word) sb.append(SEPARATOR_TOKEN) for(v <- d.vector) { sb.append(v) sb.append(SEPARATOR_TOKEN) } sb.setLength(sb.length - 1) sb.append("\n") sb.toString() }) println("repartition(1)") val rep = par.repartition(1) println("collect()") val vectorsAsString = rep.collect() println("Collected serialized vectors ..") val cfile = new mutable.StringBuilder() cfile.append(vectorsAsString.length) cfile.append(" ") cfile.append(vectorSize) cfile.append("\n") val sb = new StringBuilder sb.append("word,") for(i <- 0 until vectorSize) { sb.append("v") sb.append(i.toString) sb.append(",") } sb.setLength(sb.length - 1) sb.append("\n") for(vectorString <- vectorsAsString) { sb.append(vectorString) cfile.append(vectorString) } println("Saving file to " + new Path(path, "data").toUri.toString) sc.parallelize(sb.toString().split("\n"), 1).saveAsTextFile(new Path(path+".csv", "data").toUri.toString) sc.parallelize(cfile.toString().split("\n"), 1).saveAsTextFile(new Path(path+".cs", "data").toUri.toString) }
Apparently it's working similar to their current implementation - it doesn't.
I'd like to get a word2vec model. It works with small files but not if the model gets larger.
-
Sushil Verma almost 2 yearsrepartition(10) worked for me
-
Jacek Laskowski almost 2 years@SushilVerma Can you show the query you used. Did
repartition(10)
help before or aftergroupBy
? How do you submit your Spark app?